From a7916106fecb74d866f732094ce8bc6d806c2e64 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Fri, 5 Jul 2024 18:22:16 +0200 Subject: [PATCH 01/24] rework step --- miniconf_mqtt/src/lib.rs | 230 ++++++++++++++++++++------------------- 1 file changed, 119 insertions(+), 111 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 14c06825..0eaa52dd 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -6,6 +6,7 @@ //! The Minimq MQTT client for `miniconf``. use heapless::{String, Vec}; +use log::{error, info, warn}; use miniconf::{Error, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; pub use minimq; use minimq::{ @@ -31,7 +32,7 @@ const REPUBLISH_TIMEOUT_SECONDS: u32 = 2; type Iter = NodeIter, '/'>>; mod sm { - use super::{Iter, TreeKey, REPUBLISH_TIMEOUT_SECONDS}; + use super::{TreeKey, REPUBLISH_TIMEOUT_SECONDS}; use minimq::embedded_time::{self, duration::Extensions, Instant}; use smlang::statemachine; @@ -44,9 +45,9 @@ mod sm { PendingSubscribe + Subscribed / start_republish_timeout = PendingRepublish, // Settings republish can be completed any time after subscription. - PendingRepublish + StartRepublish / start_republish = RepublishingSettings, - RepublishingSettings + StartRepublish / start_republish = RepublishingSettings, - Active + StartRepublish / start_republish = RepublishingSettings, + PendingRepublish + StartRepublish = RepublishingSettings, + RepublishingSettings + StartRepublish = RepublishingSettings, + Active + StartRepublish = RepublishingSettings, // After republishing settings, we are in an idle "active" state. RepublishingSettings + Complete = Active, @@ -56,18 +57,16 @@ mod sm { } } - pub struct Context, const Y: usize> { + pub struct Context { clock: C, timeout: Option>, - pub republish_state: Iter, } - impl, const Y: usize> Context { + impl Context { pub fn new(clock: C) -> Self { Self { clock, timeout: None, - republish_state: M::nodes(), } } @@ -80,52 +79,69 @@ mod sm { } } - impl, const Y: usize> StateMachineContext - for Context - { + impl StateMachineContext for Context { fn start_republish_timeout(&mut self) { self.timeout .replace(self.clock.try_now().unwrap() + REPUBLISH_TIMEOUT_SECONDS.seconds()); } - - fn start_republish(&mut self) { - self.republish_state = M::nodes(); - } } } enum Command<'a> { - List, + List { path: &'a str }, Get { path: &'a str }, Set { path: &'a str, value: &'a [u8] }, } impl<'a> Command<'a> { - fn from_message(topic: &'a str, value: &'a [u8]) -> Result { - let path = topic.strip_prefix('/').unwrap_or(topic); - - if path == "list" { - Ok(Command::List) + fn try_from_message(topic: &'a str, value: &'a [u8]) -> Result { + if topic == "/list" { + let path = core::str::from_utf8(value).or(Err(()))?; + Ok(Command::List { path }) } else { - match path.strip_prefix("settings") { - Some(path) => { - if value.is_empty() { - Ok(Command::Get { path }) - } else { - Ok(Command::Set { path, value }) - } - } - _ => Err(()), + let path = topic.strip_prefix("/settings").ok_or(())?; + if value.is_empty() { + Ok(Command::Get { path }) + } else { + Ok(Command::Set { path, value }) } } } } -struct ListCache { +/// Cache correlation data and topic for multi-part responses. +struct ResponseCache { topic: String, correlation_data: Option>, } +impl TryFrom<&minimq::types::Properties<'_>> for ResponseCache { + type Error = &'static str; + fn try_from(value: &minimq::types::Properties<'_>) -> Result { + let topic = value + .into_iter() + .response_topic() + .ok_or("No response topic")? + .try_into() + .or(Err("Response topic too long"))?; + let correlation_data = value + .into_iter() + .find_map(|prop| { + if let Ok(minimq::Property::CorrelationData(cd)) = prop { + Some(Vec::try_from(cd.0)) + } else { + None + } + }) + .transpose() + .or(Err("Correlation data too long"))?; + Ok(Self { + topic, + correlation_data, + }) + } +} + /// MQTT settings interface. /// /// # Design @@ -172,9 +188,10 @@ where Broker: minimq::Broker, { mqtt: minimq::Minimq<'buf, Stack, Clock, Broker>, - state: sm::StateMachine>, + state: sm::StateMachine>, prefix: String, - listing_state: Option<(ListCache, Iter)>, + iter: Iter, + listing: Option, } impl<'buf, Settings, Stack, Clock, Broker, const Y: usize> @@ -216,25 +233,26 @@ where Ok(Self { mqtt, state: sm::StateMachine::new(sm::Context::new(clock)), + iter: Settings::nodes(), prefix, - listing_state: None, + listing: None, }) } fn handle_listing(&mut self) { - let Some((cache, iter)) = &mut self.listing_state else { + let Some(cache) = &mut self.listing else { return; }; while self.mqtt.client().can_publish(QoS::AtLeastOnce) { // Note(unwrap): Publishing should not fail because `can_publish()` was checked before // attempting this publish. - let (code, path) = match iter.next() { + let (code, path) = match self.iter.next() { Some(path) => (ResponseCode::Continue, path.unwrap().0.into_inner()), None => (ResponseCode::Ok, String::new()), }; - let props = [code.as_user_property()]; + let props = [code.into()]; let mut outgoing = Publication::new(path.as_bytes()) .topic(&cache.topic) .properties(&props) @@ -248,8 +266,8 @@ where Ok(response) => response, Err(e) => { // Something went wrong. Abort the listing. - log::error!("Listing failed to build response: {e:?}"); - self.listing_state.take(); + error!("Listing failed to build response: {e:?}"); + self.listing.take(); return; } }; @@ -259,15 +277,19 @@ where // If we're done with listing, bail out of the loop. if code != ResponseCode::Continue { - self.listing_state.take(); + self.listing.take(); break; } } } fn handle_republish(&mut self, settings: &Settings) { + if self.listing.is_some() { + return; + } + while self.mqtt.client().can_publish(QoS::AtMostOnce) { - let Some(path) = self.state.context_mut().republish_state.next() else { + let Some(path) = self.iter.next() else { // If we got here, we completed iterating over the topics and published them all. self.state.process_event(sm::Events::Complete).unwrap(); break; @@ -305,7 +327,7 @@ where .publish( Publication::new(b"") .topic(&prefixed_topic) - .properties(&[ResponseCode::Error.as_user_property()]) + .properties(&[ResponseCode::Error.into()]) .finish() .unwrap(), ) @@ -317,7 +339,7 @@ where } fn handle_subscription(&mut self) { - log::info!("MQTT connected, subscribing to settings"); + info!("MQTT connected, subscribing to settings"); // Note(unwrap): We construct a string with two more characters than the prefix // structure, so we are guaranteed to have space for storage. @@ -400,46 +422,47 @@ where settings: &mut Settings, ) -> Result> { let mut updated = false; - let poll = self.mqtt.poll(|client, topic, message, properties| { - let Some(path) = topic.strip_prefix(self.prefix.as_str()) else { - log::info!("Unexpected topic prefix: {topic}"); + let poll = self.mqtt.poll(|client, topic, payload, properties| { + let Some(topic) = topic.strip_prefix(self.prefix.as_str()) else { + info!("Unexpected topic prefix: {topic}"); return; }; - let Ok(command) = Command::from_message(path, message) else { - log::info!("Unknown miniconf command: {path}"); + let Ok(command) = Command::try_from_message(topic, payload) else { + info!("Unknown miniconf command: {topic}"); return; }; match command { - Command::List => { + Command::List { path } => { if !properties .into_iter() .any(|prop| matches!(prop, Ok(minimq::Property::ResponseTopic(_)))) { - log::info!("Discarding `List` without `ResponseTopic`"); + info!("Discarding `List` without `ResponseTopic`"); return; } - let response = match self.listing_state { - Some(_) => "`List` already in progress", - None => { - match handle_listing_request(properties) { - Err(msg) => msg, - Ok(cache) => { - self.listing_state.replace((cache, Settings::nodes())); - - // There is no positive response sent during list commands, - // instead, the response is sent as a property of the listed - // elements. As such, we are now finished processing a list - // command. - return; - } + let response = if self.listing.is_some() { + "`List` already in progress" + } else { + match ResponseCache::try_from(properties) { + Err(msg) => msg, + Ok(cache) => { + self.listing.replace(cache); + self.iter = + Settings::nodes().root(&Path::<_, '/'>::from(path)).unwrap(); + + // There is no positive response sent during list commands, + // instead, the response is sent as a property of the listed + // elements. As such, we are now finished processing a list + // command. + return; } } }; - let props = [ResponseCode::Error.as_user_property()]; + let props = [ResponseCode::Error.into()]; if let Ok(response) = minimq::Publication::new(response.as_bytes()) .reply(properties) .properties(&props) @@ -451,7 +474,7 @@ where } Command::Get { path } => { - let props = [ResponseCode::Ok.as_user_property()]; + let props = [ResponseCode::Ok.into()]; let Ok(message) = DeferredPublication::new(|buf| settings.get_json(path, buf)) .properties(&props) .reply(properties) @@ -465,20 +488,30 @@ where return; }; - if let Err(minimq::PubError::Serialization(err)) = client.publish(message) { - if let Ok(message) = DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) - }) - .properties(&[ResponseCode::Error.as_user_property()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - // Try to send the error as a best-effort. If we don't have enough - // buffer space to encode the error, there's nothing more we can do. - client.publish(message).ok(); - }; + match client.publish(message) { + Ok(()) => {} + Err(err) => {} + Err(minimq::PubError::Serialization(miniconf::Error::Traversal( + Traversal::TooShort(depth), + ))) => { + // Internal node + // TODO: iter update + } + Err(minimq::PubError::Serialization(err)) => { + if let Ok(message) = DeferredPublication::new(|mut buf| { + let start = buf.len(); + write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) + }) + .properties(&[ResponseCode::Error.into()]) + .reply(properties) + .qos(QoS::AtLeastOnce) + .finish() + { + // Try to send the error as a best-effort. If we don't have enough + // buffer space to encode the error, there's nothing more we can do. + client.publish(message).ok(); + }; + } } } @@ -486,7 +519,7 @@ where Ok(_depth) => { updated = true; if let Ok(response) = Publication::new("OK".as_bytes()) - .properties(&[ResponseCode::Ok.as_user_property()]) + .properties(&[ResponseCode::Ok.into()]) .reply(properties) .qos(QoS::AtLeastOnce) .finish() @@ -499,7 +532,7 @@ where let start = buf.len(); write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) }) - .properties(&[ResponseCode::Error.as_user_property()]) + .properties(&[ResponseCode::Error.into()]) .reply(properties) .qos(QoS::AtLeastOnce) .finish() @@ -513,7 +546,7 @@ where match poll { Ok(_) => Ok(updated), Err(minimq::Error::SessionReset) => { - log::warn!("Session reset"); + warn!("Session reset"); self.state.process_event(sm::Events::Reset).unwrap(); Ok(false) } @@ -538,9 +571,9 @@ enum ResponseCode { Error, } -impl ResponseCode { - const fn as_user_property(self) -> minimq::Property<'static> { - let string = match self { +impl From for minimq::Property<'static> { + fn from(value: ResponseCode) -> Self { + let string = match value { ResponseCode::Ok => "Ok", ResponseCode::Continue => "Continue", ResponseCode::Error => "Error", @@ -552,28 +585,3 @@ impl ResponseCode { ) } } - -fn handle_listing_request( - properties: &minimq::types::Properties<'_>, -) -> Result { - // If the response topic is too long, send an error - let response_topic = properties.into_iter().response_topic().unwrap(); - - // If there is a CD and it's too long, send an error response. - let correlation_data = if let Some(cd) = properties.into_iter().find_map(|prop| { - if let Ok(minimq::Property::CorrelationData(cd)) = prop { - Some(cd.0) - } else { - None - } - }) { - Some(Vec::try_from(cd).map_err(|_| "Correlation data too long")?) - } else { - None - }; - - Ok(ListCache { - topic: String::try_from(response_topic).map_err(|_| "Response topic too long")?, - correlation_data, - }) -} From 4cb41c5905b99e78cb94d5ca08cf4ab5ef6e096f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Fri, 5 Jul 2024 18:22:30 +0200 Subject: [PATCH 02/24] rework overall statemachine and layout --- miniconf_cli/Cargo.toml | 2 +- miniconf_mqtt/Cargo.toml | 2 +- miniconf_mqtt/src/lib.rs | 383 +++++++++++++++++++++------------------ 3 files changed, 213 insertions(+), 174 deletions(-) diff --git a/miniconf_cli/Cargo.toml b/miniconf_cli/Cargo.toml index 3f87b1f9..7eab86b2 100644 --- a/miniconf_cli/Cargo.toml +++ b/miniconf_cli/Cargo.toml @@ -19,7 +19,7 @@ miniconf = { version = "0.11.0", path = "../miniconf", features = [ ] } postcard = "1.0.8" serde-json-core = "0.5.1" -yafnv = "1.0.0" +yafnv = "2.0.0" [features] std = [] diff --git a/miniconf_mqtt/Cargo.toml b/miniconf_mqtt/Cargo.toml index d84807b6..9c499c5b 100644 --- a/miniconf_mqtt/Cargo.toml +++ b/miniconf_mqtt/Cargo.toml @@ -18,7 +18,7 @@ std = [] [dependencies] miniconf = { version = "0.11", features = ["json-core"], default-features = false, path = "../miniconf" } minimq = "0.9.0" -smlang = "0.6" +smlang = "0.7" embedded-io = "0.6" log = "0.4" heapless = "0.8" diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 0eaa52dd..aa9eefe3 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -7,13 +7,13 @@ use heapless::{String, Vec}; use log::{error, info, warn}; -use miniconf::{Error, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; +use miniconf::{IntoKeys, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; pub use minimq; use minimq::{ embedded_nal::TcpClientStack, embedded_time, types::{SubscriptionOptions, TopicFilter}, - DeferredPublication, Publication, QoS, + ConfigBuilder, DeferredPublication, ProtocolError, Publication, QoS, }; use embedded_io::Write; @@ -29,31 +29,62 @@ const MAX_CD_LENGTH: usize = 32; // republished. const REPUBLISH_TIMEOUT_SECONDS: u32 = 2; -type Iter = NodeIter, '/'>>; +const SEPARATOR: char = '/'; + +type Iter = NodeIter, SEPARATOR>>; + +pub enum Error { + Miniconf(miniconf::Error<()>), + State(sm::Error), + Minimq(minimq::Error), +} + +impl From for Error { + fn from(value: sm::Error) -> Self { + Self::State(value) + } +} + +impl From> for Error { + fn from(value: miniconf::Error) -> Self { + Self::Miniconf(match value { + miniconf::Error::Finalization(_) => miniconf::Error::Finalization(()), + miniconf::Error::Inner(depth, _) => miniconf::Error::Inner(depth, ()), + miniconf::Error::Traversal(t) => miniconf::Error::Traversal(t), + _ => unimplemented!(), + }) + } +} + +impl From for Error { + fn from(value: miniconf::Traversal) -> Self { + Self::Miniconf(value.into()) + } +} + +impl From> for Error { + fn from(value: minimq::Error) -> Self { + Self::Minimq(value) + } +} mod sm { - use super::{TreeKey, REPUBLISH_TIMEOUT_SECONDS}; + use super::REPUBLISH_TIMEOUT_SECONDS; use minimq::embedded_time::{self, duration::Extensions, Instant}; use smlang::statemachine; statemachine! { transitions: { - *Initial + Connected = ConnectedToBroker, - ConnectedToBroker + IndicatedLife = PendingSubscribe, + *Connect + Connect = Alive, + Alive + Alive = Subscribe, + Subscribe + Subscribe / start_timeout = Wait, + Wait + Tick [timed_out] = Init, + Init + Init = Multipart, - // After initial subscriptions, we start a timeout to republish all settings. - PendingSubscribe + Subscribed / start_republish_timeout = PendingRepublish, + Multipart + Complete = Single, + Single + Multipart = Multipart, - // Settings republish can be completed any time after subscription. - PendingRepublish + StartRepublish = RepublishingSettings, - RepublishingSettings + StartRepublish = RepublishingSettings, - Active + StartRepublish = RepublishingSettings, - - // After republishing settings, we are in an idle "active" state. - RepublishingSettings + Complete = Active, - - // All states transition back to `initial` on reset. - _ + Reset = Initial, + _ + Reset = Connect, } } @@ -69,20 +100,20 @@ mod sm { timeout: None, } } - - pub fn republish_has_timed_out(&self) -> bool { - if let Some(timeout) = self.timeout { - self.clock.try_now().unwrap() > timeout - } else { - false - } - } } impl StateMachineContext for Context { - fn start_republish_timeout(&mut self) { + fn timed_out(&self) -> Result { + Ok(self + .timeout + .map(|t| self.clock.try_now().unwrap() >= t) + .unwrap_or_default()) + } + + fn start_timeout(&mut self) -> Result<(), ()> { self.timeout .replace(self.clock.try_now().unwrap() + REPUBLISH_TIMEOUT_SECONDS.seconds()); + Ok(()) } } } @@ -110,19 +141,37 @@ impl<'a> Command<'a> { } /// Cache correlation data and topic for multi-part responses. -struct ResponseCache { - topic: String, +struct Multipart, const Y: usize> { + iter: Iter, + topic: Option>, correlation_data: Option>, } -impl TryFrom<&minimq::types::Properties<'_>> for ResponseCache { +impl, const Y: usize> Default for Multipart { + fn default() -> Self { + Self { + iter: M::nodes(), + topic: None, + correlation_data: None, + } + } +} + +impl, const Y: usize> Multipart { + fn root(mut self, keys: K) -> Result { + self.iter = self.iter.root(keys)?; + Ok(self) + } +} + +impl, const Y: usize> TryFrom<&minimq::types::Properties<'_>> for Multipart { type Error = &'static str; fn try_from(value: &minimq::types::Properties<'_>) -> Result { let topic = value .into_iter() .response_topic() - .ok_or("No response topic")? - .try_into() + .map(TryInto::try_into) + .transpose() .or(Err("Response topic too long"))?; let correlation_data = value .into_iter() @@ -136,12 +185,35 @@ impl TryFrom<&minimq::types::Properties<'_>> for ResponseCache { .transpose() .or(Err("Correlation data too long"))?; Ok(Self { + iter: M::nodes(), topic, correlation_data, }) } } +#[derive(Debug, Copy, Clone, PartialEq)] +enum ResponseCode { + Ok, + Continue, + Error, +} + +impl From for minimq::Property<'static> { + fn from(value: ResponseCode) -> Self { + let string = match value { + ResponseCode::Ok => "Ok", + ResponseCode::Continue => "Continue", + ResponseCode::Error => "Error", + }; + + minimq::Property::UserProperty( + minimq::types::Utf8String("code"), + minimq::types::Utf8String(string), + ) + } +} + /// MQTT settings interface. /// /// # Design @@ -190,8 +262,7 @@ where mqtt: minimq::Minimq<'buf, Stack, Clock, Broker>, state: sm::StateMachine>, prefix: String, - iter: Iter, - listing: Option, + pending: Multipart, } impl<'buf, Settings, Stack, Clock, Broker, const Y: usize> @@ -213,34 +284,115 @@ where stack: Stack, prefix: &str, clock: Clock, - config: minimq::ConfigBuilder<'buf, Broker>, - ) -> Result { + config: ConfigBuilder<'buf, Broker>, + ) -> Result { + assert!( + prefix.len() + "/settings".len() + Settings::metadata().max_length("/") + <= MAX_TOPIC_LENGTH + ); + // Configure a will so that we can indicate whether or not we are connected. let prefix = String::try_from(prefix).unwrap(); - let mut connection_topic = prefix.clone(); - connection_topic.push_str("/alive").unwrap(); - let will = minimq::Will::new(&connection_topic, b"0", &[])? + let mut alive = prefix.clone(); + alive.push_str("/alive").unwrap(); + let will = minimq::Will::new(&alive, b"0", &[])? .retained() .qos(QoS::AtMostOnce); - let config = config.autodowngrade_qos().will(will)?; - let mqtt = minimq::Minimq::new(stack, clock.clone(), config); - - let max_length = Settings::metadata().max_length("/"); - assert!(prefix.len() + "/settings".len() + max_length <= MAX_TOPIC_LENGTH); - Ok(Self { - mqtt, + mqtt: minimq::Minimq::new(stack, clock.clone(), config), state: sm::StateMachine::new(sm::Context::new(clock)), - iter: Settings::nodes(), prefix, - listing: None, + pending: Multipart::default(), }) } + /// Update the MQTT interface and service the network. + /// + /// # Returns + /// True if the settings changed. False otherwise. + pub fn update(&mut self, settings: &mut Settings) -> Result> { + if !self.mqtt.client().is_connected() { + // Note(unwrap): It's always safe to reset. + self.state.process_event(sm::Events::Reset).unwrap(); + } + + match self.state.state() { + sm::States::Connect => { + if self.mqtt.client().is_connected() { + self.state.process_event(sm::Events::Connect).unwrap(); + } + } + sm::States::Alive => { + if self.alive() { + self.state.process_event(sm::Events::Alive).unwrap(); + } + } + sm::States::Subscribe => { + if self.subscribe() { + self.state.process_event(sm::Events::Subscribe).unwrap(); + } + } + sm::States::Wait => { + self.state.process_event(sm::Events::Tick).unwrap(); + } + sm::States::Init => { + self.pending = Multipart::default(); + self.state.process_event(sm::Events::Init).unwrap(); + } + sm::States::Multipart => self.multipart(settings), + sm::States::Single => { // handled in poll() + } + } + // All states must handle MQTT traffic. + self.poll(settings) + } + + /// Force republication of the current settings. + /// + /// # Note + /// This is intended to be used if modification of a setting had side effects that affected + /// another setting. + pub fn republish(&mut self, path: &str) -> Result<(), Error> { + self.pending = Multipart::default().root(&Path::<_, SEPARATOR>::from(path))?; + self.state.process_event(sm::Events::Multipart)?; + Ok(()) + } + + fn alive(&mut self) -> bool { + // Publish a connection status message. + let mut alive = self.prefix.clone(); + alive.push_str("/alive").unwrap(); + let msg = Publication::new(b"1") + .topic(&alive) + .retain() + .finish() + .unwrap(); + + self.mqtt.client().publish(msg).is_ok() + } + + fn subscribe(&mut self) -> bool { + info!("MQTT connected, subscribing to settings"); + + // Note(unwrap): We construct a string with two more characters than the prefix + // structure, so we are guaranteed to have space for storage. + let mut settings = self.prefix.clone(); + settings.push_str("/settings/#").unwrap(); + let mut list = self.prefix.clone(); + list.push_str("/list").unwrap(); + let opts = SubscriptionOptions::default().ignore_local_messages(); + let topics = [ + TopicFilter::new(&settings).options(opts), + TopicFilter::new(&list).options(opts), + ]; + + self.mqtt.client().subscribe(&topics, &[]).is_ok() + } + fn handle_listing(&mut self) { - let Some(cache) = &mut self.listing else { + let Some(cache) = &mut self.pending else { return; }; @@ -267,7 +419,7 @@ where Err(e) => { // Something went wrong. Abort the listing. error!("Listing failed to build response: {e:?}"); - self.listing.take(); + self.pending.take(); return; } }; @@ -277,14 +429,14 @@ where // If we're done with listing, bail out of the loop. if code != ResponseCode::Continue { - self.listing.take(); + self.pending.take(); break; } } } - fn handle_republish(&mut self, settings: &Settings) { - if self.listing.is_some() { + fn multipart(&mut self, settings: &Settings) { + if self.pending.is_some() { return; } @@ -338,89 +490,7 @@ where } } - fn handle_subscription(&mut self) { - info!("MQTT connected, subscribing to settings"); - - // Note(unwrap): We construct a string with two more characters than the prefix - // structure, so we are guaranteed to have space for storage. - let mut settings_topic = self.prefix.clone(); - settings_topic.push_str("/settings/#").unwrap(); - let mut list_topic = self.prefix.clone(); - list_topic.push_str("/list").unwrap(); - - let opts = SubscriptionOptions::default().ignore_local_messages(); - let topics = [ - TopicFilter::new(&settings_topic).options(opts), - TopicFilter::new(&list_topic).options(opts), - ]; - - if self.mqtt.client().subscribe(&topics, &[]).is_ok() { - self.state.process_event(sm::Events::Subscribed).unwrap(); - } - } - - fn handle_indicating_alive(&mut self) { - // Publish a connection status message. - let mut connection_topic = self.prefix.clone(); - connection_topic.push_str("/alive").unwrap(); - - if self - .mqtt - .client() - .publish( - Publication::new(b"1") - .topic(&connection_topic) - .retain() - .finish() - .unwrap(), - ) - .is_ok() - { - self.state.process_event(sm::Events::IndicatedLife).unwrap(); - } - } - - /// Update the MQTT interface and service the network. - /// - /// # Returns - /// True if the settings changed. False otherwise. - pub fn update(&mut self, settings: &mut Settings) -> Result> { - if !self.mqtt.client().is_connected() { - // Note(unwrap): It's always safe to reset. - self.state.process_event(sm::Events::Reset).unwrap(); - } - - match *self.state.state() { - sm::States::Initial => { - if self.mqtt.client().is_connected() { - self.state.process_event(sm::Events::Connected).unwrap(); - } - } - sm::States::ConnectedToBroker => self.handle_indicating_alive(), - sm::States::PendingSubscribe => self.handle_subscription(), - sm::States::PendingRepublish => { - if self.state.context().republish_has_timed_out() { - self.state - .process_event(sm::Events::StartRepublish) - .unwrap(); - } - } - sm::States::RepublishingSettings => self.handle_republish(settings), - - // Nothing to do in the active state. - sm::States::Active => {} - } - - self.handle_listing(); - - // All states must handle MQTT traffic. - self.handle_mqtt_traffic(settings) - } - - fn handle_mqtt_traffic( - &mut self, - settings: &mut Settings, - ) -> Result> { + fn poll(&mut self, settings: &mut Settings) -> Result> { let mut updated = false; let poll = self.mqtt.poll(|client, topic, payload, properties| { let Some(topic) = topic.strip_prefix(self.prefix.as_str()) else { @@ -443,15 +513,15 @@ where return; } - let response = if self.listing.is_some() { + let response = if self.pending.is_some() { "`List` already in progress" } else { - match ResponseCache::try_from(properties) { + match Multipart::try_from(properties) { Err(msg) => msg, Ok(cache) => { - self.listing.replace(cache); + self.pending.replace(cache); self.iter = - Settings::nodes().root(&Path::<_, '/'>::from(path)).unwrap(); + Settings::nodes().root(&Path::<_, SEPARATOR>::from(path)).unwrap(); // There is no positive response sent during list commands, // instead, the response is sent as a property of the listed @@ -550,38 +620,7 @@ where self.state.process_event(sm::Events::Reset).unwrap(); Ok(false) } - Err(other) => Err(other), + Err(other) => Err(other.into()), } } - - /// Force republication of the current settings. - /// - /// # Note - /// This is intended to be used if modification of a setting had side effects that affected - /// another setting. - pub fn force_republish(&mut self) { - self.state.process_event(sm::Events::StartRepublish).ok(); - } -} - -#[derive(Debug, Copy, Clone, PartialEq)] -enum ResponseCode { - Ok, - Continue, - Error, -} - -impl From for minimq::Property<'static> { - fn from(value: ResponseCode) -> Self { - let string = match value { - ResponseCode::Ok => "Ok", - ResponseCode::Continue => "Continue", - ResponseCode::Error => "Error", - }; - - minimq::Property::UserProperty( - minimq::types::Utf8String("code"), - minimq::types::Utf8String(string), - ) - } } From 1530daf7b115b76f5ba0928bf03fab04b0d2406a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Fri, 5 Jul 2024 18:23:56 +0200 Subject: [PATCH 03/24] cli: yafnv update --- miniconf_cli/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miniconf_cli/src/lib.rs b/miniconf_cli/src/lib.rs index 03f5798e..c45ab0a8 100644 --- a/miniconf_cli/src/lib.rs +++ b/miniconf_cli/src/lib.rs @@ -217,7 +217,7 @@ where (val as _, rest) } }; - let check: u32 = yafnv::fnv1a(val.iter().copied()); + let check: u32 = yafnv::fnv1a(val); awrite(&mut write, " ".as_bytes()).await?; let rl = rest.len(); let mut sl = &mut rest[..]; @@ -234,7 +234,7 @@ where Err(miniconf::Error::Traversal(Traversal::Absent(_depth))) => "absent".as_bytes(), ret => &buf[..ret?], }; - if yafnv::fnv1a::(def.iter().copied()) == check { + if yafnv::fnv1a::(def) == check { awrite(&mut write, " (default)\n".as_bytes()).await?; } else { awrite(&mut write, " (default: ".as_bytes()).await?; From 3d11be944d35d7039c39c6f71b510beefe5b761d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 13:20:30 +0200 Subject: [PATCH 04/24] rewrite miniconf mqtt client --- miniconf_mqtt/README.md | 8 + miniconf_mqtt/examples/mqtt.rs | 2 +- miniconf_mqtt/src/lib.rs | 422 ++++++++++++-------------- py/miniconf-mqtt/miniconf/__main__.py | 9 +- py/miniconf-mqtt/miniconf/miniconf.py | 18 +- 5 files changed, 228 insertions(+), 231 deletions(-) diff --git a/miniconf_mqtt/README.md b/miniconf_mqtt/README.md index d010d150..07ddd073 100644 --- a/miniconf_mqtt/README.md +++ b/miniconf_mqtt/README.md @@ -1,3 +1,11 @@ # `miniconf` MQTT Client This package contains a MQTT client exposing a [`miniconf`](https://crates.io/crates/miniconf) interface via MQTT using [`minimq`](https://crates.io/crates/minimq). + +| Command | Node | Response Topic | Payload | +| --- | --- | --- | --- | +| Get | Leaf | set | empty | +| List | Internal | set | empty | +| Dump | | not set | empty | +| Set | Leaf | | some | +| Error | Internal | | some | diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 90494418..62c08a4a 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -106,7 +106,7 @@ async fn main() { } if settings.exit { - break; + // break; } tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index aa9eefe3..f01e9a06 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -5,20 +5,22 @@ #![forbid(unsafe_code)] //! The Minimq MQTT client for `miniconf``. +use core::fmt::Display; + use heapless::{String, Vec}; -use log::{error, info, warn}; +use log::{debug, error, info, warn}; use miniconf::{IntoKeys, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; pub use minimq; use minimq::{ embedded_nal::TcpClientStack, embedded_time, - types::{SubscriptionOptions, TopicFilter}, + types::{Properties, SubscriptionOptions, TopicFilter}, ConfigBuilder, DeferredPublication, ProtocolError, Publication, QoS, }; use embedded_io::Write; -// The maximum topic length of any settings path. +// The maximum topic length of any topic (prefix + "/settings" + miniconf path). const MAX_TOPIC_LENGTH: usize = 128; // The maximum amount of correlation data that will be cached for listing. This is set to function @@ -33,9 +35,14 @@ const SEPARATOR: char = '/'; type Iter = NodeIter, SEPARATOR>>; +/// Miniconf MQTT joint error type +#[derive(Debug, PartialEq)] pub enum Error { + /// Miniconf Miniconf(miniconf::Error<()>), + /// State machine State(sm::Error), + /// Minimq Minimq(minimq::Error), } @@ -79,11 +86,9 @@ mod sm { Alive + Alive = Subscribe, Subscribe + Subscribe / start_timeout = Wait, Wait + Tick [timed_out] = Init, - Init + Init = Multipart, - + Init + Multipart = Multipart, Multipart + Complete = Single, Single + Multipart = Multipart, - _ + Reset = Connect, } } @@ -118,32 +123,10 @@ mod sm { } } -enum Command<'a> { - List { path: &'a str }, - Get { path: &'a str }, - Set { path: &'a str, value: &'a [u8] }, -} - -impl<'a> Command<'a> { - fn try_from_message(topic: &'a str, value: &'a [u8]) -> Result { - if topic == "/list" { - let path = core::str::from_utf8(value).or(Err(()))?; - Ok(Command::List { path }) - } else { - let path = topic.strip_prefix("/settings").ok_or(())?; - if value.is_empty() { - Ok(Command::Get { path }) - } else { - Ok(Command::Set { path, value }) - } - } - } -} - /// Cache correlation data and topic for multi-part responses. struct Multipart, const Y: usize> { iter: Iter, - topic: Option>, + response_topic: Option>, correlation_data: Option>, } @@ -151,7 +134,7 @@ impl, const Y: usize> Default for Multipart { fn default() -> Self { Self { iter: M::nodes(), - topic: None, + response_topic: None, correlation_data: None, } } @@ -186,7 +169,7 @@ impl, const Y: usize> TryFrom<&minimq::types::Properties<'_>> for .or(Err("Correlation data too long"))?; Ok(Self { iter: M::nodes(), - topic, + response_topic: topic, correlation_data, }) } @@ -335,13 +318,18 @@ where } } sm::States::Wait => { - self.state.process_event(sm::Events::Tick).unwrap(); + self.state.process_event(sm::Events::Tick).ok(); } sm::States::Init => { - self.pending = Multipart::default(); - self.state.process_event(sm::Events::Init).unwrap(); + self.publish("").unwrap(); + } + sm::States::Multipart => { + if self.pending.response_topic.is_some() { + self.iter_list(); + } else { + self.iter_dump(settings); + } } - sm::States::Multipart => self.multipart(settings), sm::States::Single => { // handled in poll() } } @@ -349,23 +337,13 @@ where self.poll(settings) } - /// Force republication of the current settings. - /// - /// # Note - /// This is intended to be used if modification of a setting had side effects that affected - /// another setting. - pub fn republish(&mut self, path: &str) -> Result<(), Error> { - self.pending = Multipart::default().root(&Path::<_, SEPARATOR>::from(path))?; - self.state.process_event(sm::Events::Multipart)?; - Ok(()) - } - fn alive(&mut self) -> bool { // Publish a connection status message. let mut alive = self.prefix.clone(); alive.push_str("/alive").unwrap(); let msg = Publication::new(b"1") .topic(&alive) + .qos(QoS::AtLeastOnce) .retain() .finish() .unwrap(); @@ -391,236 +369,234 @@ where self.mqtt.client().subscribe(&topics, &[]).is_ok() } - fn handle_listing(&mut self) { - let Some(cache) = &mut self.pending else { - return; - }; + /// Force republication of the current settings. + /// + /// # Note + /// This is intended to be used if modification of a setting had side effects that affected + /// another setting. + pub fn publish(&mut self, path: &str) -> Result<(), Error> { + self.pending = Multipart::default().root(&Path::<_, SEPARATOR>::from(path))?; + self.state.process_event(sm::Events::Multipart)?; + Ok(()) + } + fn iter_list(&mut self) { while self.mqtt.client().can_publish(QoS::AtLeastOnce) { - // Note(unwrap): Publishing should not fail because `can_publish()` was checked before - // attempting this publish. - let (code, path) = match self.iter.next() { - Some(path) => (ResponseCode::Continue, path.unwrap().0.into_inner()), + let (code, path) = match self.pending.iter.next() { + Some(path) => { + let (path, node) = path.unwrap(); // Note(unwrap) checked capacity + assert!(node.is_leaf()); + (ResponseCode::Continue, path.into_inner()) + } None => (ResponseCode::Ok, String::new()), }; let props = [code.into()]; - let mut outgoing = Publication::new(path.as_bytes()) - .topic(&cache.topic) + let mut response = Publication::new(path.as_bytes()) + .topic(self.pending.response_topic.as_ref().unwrap()) // Note(unwrap) checked in update() .properties(&props) .qos(QoS::AtLeastOnce); - if let Some(cd) = &cache.correlation_data { - outgoing = outgoing.correlate(cd); + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); } - let publication = match outgoing.finish() { - Ok(response) => response, - Err(e) => { - // Something went wrong. Abort the listing. - error!("Listing failed to build response: {e:?}"); - self.pending.take(); - return; - } - }; - - // Note(unwrap) We already checked that we can publish earlier. - self.mqtt.client().publish(publication).unwrap(); + self.mqtt + .client() + .publish(response.finish().unwrap()) // Note(unwrap): has topic + .unwrap(); // Note(unwrap) checked can_publish() - // If we're done with listing, bail out of the loop. if code != ResponseCode::Continue { - self.pending.take(); + self.state.process_event(sm::Events::Complete).unwrap(); break; } } } - fn multipart(&mut self, settings: &Settings) { - if self.pending.is_some() { - return; - } - - while self.mqtt.client().can_publish(QoS::AtMostOnce) { - let Some(path) = self.iter.next() else { - // If we got here, we completed iterating over the topics and published them all. + fn iter_dump(&mut self, settings: &Settings) { + while self.mqtt.client().can_publish(QoS::AtLeastOnce) { + let Some(path) = self.pending.iter.next() else { self.state.process_event(sm::Events::Complete).unwrap(); break; }; - let (path, _node) = path.unwrap(); + let (path, node) = path.unwrap(); + assert!(node.is_leaf()); - let mut prefixed_topic = self.prefix.clone(); - prefixed_topic + let mut topic = self.prefix.clone(); + topic .push_str("/settings") - .and_then(|_| prefixed_topic.push_str(&path)) + .and_then(|_| topic.push_str(&path)) .unwrap(); - // If the topic is not present, we'll fail to serialize the setting into the - // payload and will never publish. The iterator has already incremented, so this is - // acceptable. - let response = DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) - .topic(&prefixed_topic) - .finish() - .unwrap(); + let mut response = DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) + .topic(&topic) + .qos(QoS::AtLeastOnce); + + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); + } - // Note(unwrap): This should not fail because `can_publish()` was checked before - // attempting this publish. - match self.mqtt.client().publish(response) { - Err(minimq::PubError::Serialization(Error::Traversal(Traversal::Absent(_)))) => {} + // Note(unwrap): has topic + match self.mqtt.client().publish(response.finish().unwrap()) { + Err(minimq::PubError::Serialization(miniconf::Error::Traversal( + Traversal::Absent(_), + ))) => {} - // If the value is too large to serialize, print an error to the topic instead Err(minimq::PubError::Error(minimq::Error::Minimq( minimq::MinimqError::Protocol(minimq::ProtocolError::Serialization( minimq::SerError::InsufficientMemory, )), ))) => { + let props = [ResponseCode::Error.into()]; + let mut response = Publication::new(b"") + .topic(&topic) + .properties(&props) + .qos(QoS::AtLeastOnce); + + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); + } + self.mqtt .client() - .publish( - Publication::new(b"") - .topic(&prefixed_topic) - .properties(&[ResponseCode::Error.into()]) - .finish() - .unwrap(), - ) - .unwrap(); + .publish(response.finish().unwrap()) // Note(unwrap): has topic + .map_err(|err| info!("Dump reply failed: {err:?}")) + .ok(); } - other => other.unwrap(), + other => other.unwrap(), // Note(unwrap): checked can_publish } } } - fn poll(&mut self, settings: &mut Settings) -> Result> { - let mut updated = false; - let poll = self.mqtt.poll(|client, topic, payload, properties| { - let Some(topic) = topic.strip_prefix(self.prefix.as_str()) else { - info!("Unexpected topic prefix: {topic}"); - return; - }; - - let Ok(command) = Command::try_from_message(topic, payload) else { - info!("Unknown miniconf command: {topic}"); - return; - }; - - match command { - Command::List { path } => { - if !properties - .into_iter() - .any(|prop| matches!(prop, Ok(minimq::Property::ResponseTopic(_)))) - { - info!("Discarding `List` without `ResponseTopic`"); - return; - } - - let response = if self.pending.is_some() { - "`List` already in progress" - } else { - match Multipart::try_from(properties) { - Err(msg) => msg, - Ok(cache) => { - self.pending.replace(cache); - self.iter = - Settings::nodes().root(&Path::<_, SEPARATOR>::from(path)).unwrap(); - - // There is no positive response sent during list commands, - // instead, the response is sent as a property of the listed - // elements. As such, we are now finished processing a list - // command. - return; - } - } - }; + fn response<'a>( + response: &str, + code: ResponseCode, + request: &Properties<'a>, + client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>, + ) -> Result<(), minimq::PubError> { + client.publish( + minimq::Publication::new(response.as_bytes()) + .reply(request) + .properties(&[code.into()]) + .qos(QoS::AtLeastOnce) + .finish() + .map_err(minimq::Error::from)?, + ) + } - let props = [ResponseCode::Error.into()]; - if let Ok(response) = minimq::Publication::new(response.as_bytes()) - .reply(properties) - .properties(&props) - .qos(QoS::AtLeastOnce) - .finish() - { - client.publish(response).ok(); - } - } + fn deferred_response<'a, T: Display>( + response: &T, + code: ResponseCode, + request: &Properties<'a>, + client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>, + ) -> Result< + (), + minimq::PubError>, + > { + client.publish( + DeferredPublication::new(|mut buf| { + let start = buf.len(); + write!(buf, "", response).and_then(|_| Ok(start - buf.len())) + }) + .reply(request) + .properties(&[code.into()]) + .qos(QoS::AtLeastOnce) + .finish() + .map_err(minimq::Error::from)?, + ) + } - Command::Get { path } => { - let props = [ResponseCode::Ok.into()]; - let Ok(message) = DeferredPublication::new(|buf| settings.get_json(path, buf)) - .properties(&props) + fn poll(&mut self, settings: &mut Settings) -> Result> { + let Self { + mqtt, + state, + prefix, + pending, + } = self; + mqtt.poll(|client, topic, payload, properties| -> bool { + let Some(path) = topic + .strip_prefix(prefix.as_str()) + .and_then(|p| p.strip_prefix("/settings")) + else { + info!("Unexpected topic: {topic}"); + return false; + }; + if payload.is_empty() { + // Try Get assuming Leaf node + if let Err(err) = client.publish( + DeferredPublication::new(|buf| settings.get_json(path, buf)) + .topic(topic) .reply(properties) - // Override the response topic with the path. + .properties(&[ResponseCode::Ok.into()]) .qos(QoS::AtLeastOnce) .finish() - else { - // If we can't create the publication, it's because there's no way to reply - // to the message. Since we don't know where to send things, abort now and - // complete handling of the `Get` request. - return; - }; - - match client.publish(message) { - Ok(()) => {} - Err(err) => {} - Err(minimq::PubError::Serialization(miniconf::Error::Traversal( - Traversal::TooShort(depth), - ))) => { - // Internal node - // TODO: iter update + .unwrap(), // Note(unwrap): has topic + ) { + match err { + minimq::PubError::Serialization(miniconf::Error::Traversal( + Traversal::TooShort(_depth), + )) => { + // Internal node: Dump or List + if let Some(response) = if state.state() == &sm::States::Multipart { + Some("") + } else { + match Multipart::::try_from(properties).and_then(|m| { + m.root(&Path::<_, '/'>::from(path)).map_err(|err| { + debug!("List/Pub root: {err}"); + "" + }) + }) { + Err(msg) => Some(msg), + Ok(m) => { + *pending = m; + state.process_event(sm::Events::Multipart).unwrap(); + None + } + } + } { + Self::response(response, ResponseCode::Error, properties, client) + .map_err(|err| info!("Dump/List error failure: {err:?}")) + .ok(); + } } - Err(minimq::PubError::Serialization(err)) => { - if let Ok(message) = DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) - }) - .properties(&[ResponseCode::Error.into()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - // Try to send the error as a best-effort. If we don't have enough - // buffer space to encode the error, there's nothing more we can do. - client.publish(message).ok(); - }; + minimq::PubError::Serialization(err) => { + // Get serialization error + Self::deferred_response(&err, ResponseCode::Error, properties, client) + .map_err(|err| info!("Serialization error failure: {err:?}")) + .ok(); } - } - } - - Command::Set { path, value } => match settings.set_json(path, value) { - Ok(_depth) => { - updated = true; - if let Ok(response) = Publication::new("OK".as_bytes()) - .properties(&[ResponseCode::Ok.into()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - client.publish(response).ok(); - } - } - Err(err) => { - if let Ok(response) = DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) - }) - .properties(&[ResponseCode::Error.into()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - client.publish(response).ok(); + minimq::PubError::Error(err) => { + error!("Get error failure: {err:?}"); } } - }, + } + false + } else { + // Set + settings + .set_json(path, payload) + .map(|_depth| { + Self::response("OK", ResponseCode::Ok, properties, client) + .map_err(|err| info!("Set OK response: {err:?}")) + .ok() + }) + .map_err(|err| { + Self::deferred_response(&err, ResponseCode::Error, properties, client) + .map_err(|err| info!("Set error response: {err:?}")) + .ok() + }) + .is_ok() } - }); - match poll { - Ok(_) => Ok(updated), - Err(minimq::Error::SessionReset) => { + }) + .map(Option::unwrap_or_default) + .or_else(|err| match err { + minimq::Error::SessionReset => { warn!("Session reset"); self.state.process_event(sm::Events::Reset).unwrap(); Ok(false) } - Err(other) => Err(other.into()), - } + other => Err(other.into()), + }) } } diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index b4189c9f..52ae016c 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -95,7 +95,13 @@ async def run(): interface = Miniconf(client, prefix) for arg in args.paths: - assert (not arg) or arg.startswith("/") + assert arg.startswith("/") or arg in ["", "?", "="] + + if arg.endswith("?"): + await interface.dump(arg[:-1]) + print(f"Dumped {arg}") + continue + try: path, value = arg.split("=", 1) except ValueError: @@ -105,7 +111,6 @@ async def run(): if not value: await interface.clear(path) print(f"Cleared retained {path}: OK") - else: await interface.set(path, json.loads(value), args.retain) print(f"Set {path} to {value}: OK") diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 1ed1cfa0..7622741d 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -107,7 +107,7 @@ def _dispatch(self, message: Message): ) del self._inflight[response_id] - async def _do(self, topic: str, **kwargs): + async def _do(self, topic: str, *, response=True, **kwargs): await self.subscribed.wait() request_id = uuid.uuid1().bytes @@ -116,7 +116,8 @@ async def _do(self, topic: str, **kwargs): self._inflight[request_id] = fut, [] props = Properties(PacketTypes.PUBLISH) - props.ResponseTopic = self.response_topic + if response: + props.ResponseTopic = self.response_topic props.CorrelationData = request_id LOGGER.info(f"Publishing {topic}: {kwargs['payload']}, [{props}]") await self.client.publish( @@ -124,7 +125,8 @@ async def _do(self, topic: str, **kwargs): properties=props, **kwargs, ) - return await fut + if response: + return await fut async def set(self, path, value, retain=False): """Write the provided data to the specified path. @@ -142,9 +144,15 @@ async def set(self, path, value, retain=False): assert len(ret) == 1 return ret[0] - async def list_paths(self): + async def list_paths(self, path=""): """Get a list of all the paths available on the device.""" - return await self._do(topic=f"{self.prefix}/list", payload="") + return await self._do(topic=f"{self.prefix}/settings{path}", payload="") + + async def dump(self, path=""): + """Get a list of all the paths available on the device.""" + return await self._do( + topic=f"{self.prefix}/settings{path}", payload="", response=False + ) async def get(self, path): """Get the specific value of a given path. From d011c1757090581af6774156257ad53d83d9c4aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 13:35:45 +0200 Subject: [PATCH 05/24] Refine mqtt py --- miniconf_mqtt/examples/mqtt.rs | 2 +- miniconf_mqtt/src/lib.rs | 62 +++++++++------------------ py/miniconf-mqtt/miniconf/miniconf.py | 12 +++--- 3 files changed, 28 insertions(+), 48 deletions(-) diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 62c08a4a..90494418 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -106,7 +106,7 @@ async fn main() { } if settings.exit { - // break; + break; } tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index f01e9a06..ce75e7db 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -469,42 +469,31 @@ where } } - fn response<'a>( - response: &str, + fn response<'a, T: Display>( + response: T, code: ResponseCode, request: &Properties<'a>, client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>, - ) -> Result<(), minimq::PubError> { - client.publish( - minimq::Publication::new(response.as_bytes()) + ) -> Result< + (), + minimq::PubError>, + > { + client + .publish( + DeferredPublication::new(|mut buf| { + let start = buf.len(); + write!(buf, "", response).and_then(|_| Ok(start - buf.len())) + }) .reply(request) .properties(&[code.into()]) .qos(QoS::AtLeastOnce) .finish() .map_err(minimq::Error::from)?, - ) - } - - fn deferred_response<'a, T: Display>( - response: &T, - code: ResponseCode, - request: &Properties<'a>, - client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>, - ) -> Result< - (), - minimq::PubError>, - > { - client.publish( - DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "", response).and_then(|_| Ok(start - buf.len())) + ) + .map_err(|err| { + info!("Response failure: {err:?}"); + err }) - .reply(request) - .properties(&[code.into()]) - .qos(QoS::AtLeastOnce) - .finish() - .map_err(minimq::Error::from)?, - ) } fn poll(&mut self, settings: &mut Settings) -> Result> { @@ -539,12 +528,12 @@ where )) => { // Internal node: Dump or List if let Some(response) = if state.state() == &sm::States::Multipart { - Some("") + Some("Pending multipart response") } else { match Multipart::::try_from(properties).and_then(|m| { m.root(&Path::<_, '/'>::from(path)).map_err(|err| { debug!("List/Pub root: {err}"); - "" + "Root not found" }) }) { Err(msg) => Some(msg), @@ -556,15 +545,12 @@ where } } { Self::response(response, ResponseCode::Error, properties, client) - .map_err(|err| info!("Dump/List error failure: {err:?}")) .ok(); } } minimq::PubError::Serialization(err) => { // Get serialization error - Self::deferred_response(&err, ResponseCode::Error, properties, client) - .map_err(|err| info!("Serialization error failure: {err:?}")) - .ok(); + Self::response(err, ResponseCode::Error, properties, client).ok(); } minimq::PubError::Error(err) => { error!("Get error failure: {err:?}"); @@ -576,15 +562,9 @@ where // Set settings .set_json(path, payload) - .map(|_depth| { - Self::response("OK", ResponseCode::Ok, properties, client) - .map_err(|err| info!("Set OK response: {err:?}")) - .ok() - }) + .map(|_depth| Self::response("OK", ResponseCode::Ok, properties, client).ok()) .map_err(|err| { - Self::deferred_response(&err, ResponseCode::Error, properties, client) - .map_err(|err| info!("Set error response: {err:?}")) - .ok() + Self::response(err, ResponseCode::Error, properties, client).ok() }) .is_ok() } diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 7622741d..7d4a3473 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -144,14 +144,14 @@ async def set(self, path, value, retain=False): assert len(ret) == 1 return ret[0] - async def list_paths(self, path=""): - """Get a list of all the paths available on the device.""" - return await self._do(topic=f"{self.prefix}/settings{path}", payload="") + async def list_paths(self, root=""): + """Get a list of all the paths below a given root.""" + return await self._do(topic=f"{self.prefix}/settings{root}", payload="") - async def dump(self, path=""): - """Get a list of all the paths available on the device.""" + async def dump(self, root=""): + """Dump all the paths at or below a given root into the namespace.""" return await self._do( - topic=f"{self.prefix}/settings{path}", payload="", response=False + topic=f"{self.prefix}/settings{root}", payload="", response=False ) async def get(self, path): From ddcbd898cda30519e3eea8dd40117e5d810b6c5b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 13:39:14 +0200 Subject: [PATCH 06/24] changelog --- CHANGELOG.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc24356..8b1cdfbb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `Keys::is_empty()` -> `Keys::finalize()` * `traverse_by_key` ensures `Keys::finalize()` * `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()` +* The MQTT client performs path listings by publishing an empty payload to an internal node + with a response topic +* [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` +must be passed to Miniconf ### Added @@ -27,16 +31,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `miniconf_cli`: a menu/command line interface * `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct `Transcode`/`IntoKeys`/`Keys` handling +* [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting + the empty payload to an internal node without a response topic +* [MQTT] support partial listing ### Removed * `digits()` gone in favor of using `usize::checked_ilog10()` * `rust_version` and `MSRV`: these crates aim to support the latest stable version of rust -### Changed -* [breaking-python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` -must be passed to Miniconf - ## [0.11.0](https://github.com/quartiq/miniconf/compare/v0.10.1...v0.11.0) - 2024-04-30 ### Changed From 3f0331f4d426d282b4a8717520a56f53f8d4ad08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 13:50:43 +0200 Subject: [PATCH 07/24] jsonpath: use ControlFlow for less Result abuse --- miniconf/src/jsonpath.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/miniconf/src/jsonpath.rs b/miniconf/src/jsonpath.rs index ffb023bd..a4576213 100644 --- a/miniconf/src/jsonpath.rs +++ b/miniconf/src/jsonpath.rs @@ -1,6 +1,6 @@ use core::{ fmt::Write, - ops::{Deref, DerefMut}, + ops::{ControlFlow::*, Deref, DerefMut}, }; use serde::{Deserialize, Serialize}; @@ -57,17 +57,16 @@ impl<'a> Iterator for JsonPathIter<'a> { type Item = &'a str; fn next(&mut self) -> Option { - // Reappropriation of `Result` as `Either` for (open, close) in [ - (".'", Ok("'")), // "'" inclusive - (".", Err(&['.', '['][..])), // '.' or '[' exclusive - ("['", Ok("']")), // "']" inclusive - ("[", Ok("]")), // "]" inclusive + (".'", Continue("'")), // "'" inclusive + (".", Break(&['.', '['][..])), // '.' or '[' exclusive + ("['", Continue("']")), // "']" inclusive + ("[", Continue("]")), // "]" inclusive ] { if let Some(rest) = self.0.strip_prefix(open) { let (end, sep) = match close { - Err(close) => (rest.find(close).unwrap_or(rest.len()), 0), - Ok(close) => (rest.find(close)?, close.len()), + Break(close) => (rest.find(close).unwrap_or(rest.len()), 0), + Continue(close) => (rest.find(close)?, close.len()), }; let (next, rest) = rest.split_at(end); self.0 = &rest[sep..]; From 60ee9062d219345bd91242979a7f95faf2e04fd1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 14:07:06 +0200 Subject: [PATCH 08/24] mqtt: streamline control flow --- miniconf_mqtt/src/lib.rs | 50 +++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index ce75e7db..009e11f6 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -469,7 +469,7 @@ where } } - fn response<'a, T: Display>( + fn respond<'a, T: Display>( response: T, code: ResponseCode, request: &Properties<'a>, @@ -512,7 +512,8 @@ where return false; }; if payload.is_empty() { - // Try Get assuming Leaf node + // Get, Dump, List + // Try a Get assuming a leaf node if let Err(err) = client.publish( DeferredPublication::new(|buf| settings.get_json(path, buf)) .topic(topic) @@ -526,31 +527,34 @@ where minimq::PubError::Serialization(miniconf::Error::Traversal( Traversal::TooShort(_depth), )) => { - // Internal node: Dump or List - if let Some(response) = if state.state() == &sm::States::Multipart { - Some("Pending multipart response") - } else { - match Multipart::::try_from(properties).and_then(|m| { + // Internal node: try Dump or List + (state.state() != &sm::States::Multipart) + .then_some(()) + .ok_or("Pending multipart response") + .and_then(|()| Multipart::::try_from(properties)) + .and_then(|m| { m.root(&Path::<_, '/'>::from(path)).map_err(|err| { debug!("List/Pub root: {err}"); "Root not found" }) - }) { - Err(msg) => Some(msg), - Ok(m) => { - *pending = m; - state.process_event(sm::Events::Multipart).unwrap(); - None - } - } - } { - Self::response(response, ResponseCode::Error, properties, client) + }) + .map(|m| { + *pending = m; + state.process_event(sm::Events::Multipart).unwrap(); + }) + .map_err(|response| { + Self::respond( + response, + ResponseCode::Error, + properties, + client, + ) .ok(); - } + }) + .ok(); } minimq::PubError::Serialization(err) => { - // Get serialization error - Self::response(err, ResponseCode::Error, properties, client).ok(); + Self::respond(err, ResponseCode::Error, properties, client).ok(); } minimq::PubError::Error(err) => { error!("Get error failure: {err:?}"); @@ -562,10 +566,8 @@ where // Set settings .set_json(path, payload) - .map(|_depth| Self::response("OK", ResponseCode::Ok, properties, client).ok()) - .map_err(|err| { - Self::response(err, ResponseCode::Error, properties, client).ok() - }) + .map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok()) + .map_err(|err| Self::respond(err, ResponseCode::Error, properties, client).ok()) .is_ok() } }) From c603a3219155dbf214df701a010d611a5084def7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 14:21:38 +0200 Subject: [PATCH 09/24] rename --- miniconf_mqtt/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 009e11f6..6dd5374a 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -150,7 +150,7 @@ impl, const Y: usize> Multipart { impl, const Y: usize> TryFrom<&minimq::types::Properties<'_>> for Multipart { type Error = &'static str; fn try_from(value: &minimq::types::Properties<'_>) -> Result { - let topic = value + let response_topic = value .into_iter() .response_topic() .map(TryInto::try_into) @@ -169,7 +169,7 @@ impl, const Y: usize> TryFrom<&minimq::types::Properties<'_>> for .or(Err("Correlation data too long"))?; Ok(Self { iter: M::nodes(), - response_topic: topic, + response_topic, correlation_data, }) } From 329a1bc93e6c87f1474b71bd7f2d07e98c98e0f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 15:35:16 +0200 Subject: [PATCH 10/24] Changed enum, readability --- miniconf_mqtt/src/lib.rs | 99 +++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 42 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 6dd5374a..c61017ce 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -304,6 +304,7 @@ where match self.state.state() { sm::States::Connect => { if self.mqtt.client().is_connected() { + info!("Connected"); self.state.process_event(sm::Events::Connect).unwrap(); } } @@ -314,6 +315,7 @@ where } sm::States::Subscribe => { if self.subscribe() { + info!("Subscribed"); self.state.process_event(sm::Events::Subscribe).unwrap(); } } @@ -321,6 +323,7 @@ where self.state.process_event(sm::Events::Tick).ok(); } sm::States::Init => { + info!("Republishing"); self.publish("").unwrap(); } sm::States::Multipart => { @@ -334,7 +337,7 @@ where } } // All states must handle MQTT traffic. - self.poll(settings) + self.poll(settings).map(|c| c == Changed::Changed) } fn alive(&mut self) -> bool { @@ -346,16 +349,12 @@ where .qos(QoS::AtLeastOnce) .retain() .finish() - .unwrap(); + .unwrap(); // Note(unwrap): has topic self.mqtt.client().publish(msg).is_ok() } fn subscribe(&mut self) -> bool { - info!("MQTT connected, subscribing to settings"); - - // Note(unwrap): We construct a string with two more characters than the prefix - // structure, so we are guaranteed to have space for storage. let mut settings = self.prefix.clone(); settings.push_str("/settings/#").unwrap(); let mut list = self.prefix.clone(); @@ -382,14 +381,16 @@ where fn iter_list(&mut self) { while self.mqtt.client().can_publish(QoS::AtLeastOnce) { - let (code, path) = match self.pending.iter.next() { - Some(path) => { + let (code, path) = self + .pending + .iter + .next() + .map(|path| { let (path, node) = path.unwrap(); // Note(unwrap) checked capacity - assert!(node.is_leaf()); + assert!(node.is_leaf()); // Note(assert): Iterator depth unlimited (ResponseCode::Continue, path.into_inner()) - } - None => (ResponseCode::Ok, String::new()), - }; + }) + .unwrap_or((ResponseCode::Ok, String::new())); let props = [code.into()]; let mut response = Publication::new(path.as_bytes()) @@ -420,8 +421,8 @@ where break; }; - let (path, node) = path.unwrap(); - assert!(node.is_leaf()); + let (path, node) = path.unwrap(); // Note(unwraped): checked capacity + assert!(node.is_leaf()); // Note(assert): Iterator depth unlimited let mut topic = self.prefix.clone(); topic @@ -449,7 +450,7 @@ where )), ))) => { let props = [ResponseCode::Error.into()]; - let mut response = Publication::new(b"") + let mut response = Publication::new(b"Serialized value too large") .topic(&topic) .properties(&props) .qos(QoS::AtLeastOnce); @@ -461,10 +462,9 @@ where self.mqtt .client() .publish(response.finish().unwrap()) // Note(unwrap): has topic - .map_err(|err| info!("Dump reply failed: {err:?}")) - .ok(); + .unwrap(); // Note(unwrap): checked can_publish, error message is short } - other => other.unwrap(), // Note(unwrap): checked can_publish + other => other.unwrap(), } } } @@ -482,7 +482,7 @@ where .publish( DeferredPublication::new(|mut buf| { let start = buf.len(); - write!(buf, "", response).and_then(|_| Ok(start - buf.len())) + write!(buf, "{}", response).and_then(|_| Ok(start - buf.len())) }) .reply(request) .properties(&[code.into()]) @@ -491,28 +491,28 @@ where .map_err(minimq::Error::from)?, ) .map_err(|err| { - info!("Response failure: {err:?}"); + debug!("Response failure: {err:?}"); err }) } - fn poll(&mut self, settings: &mut Settings) -> Result> { + fn poll(&mut self, settings: &mut Settings) -> Result> { let Self { mqtt, state, prefix, pending, } = self; - mqtt.poll(|client, topic, payload, properties| -> bool { + mqtt.poll(|client, topic, payload, properties| { let Some(path) = topic .strip_prefix(prefix.as_str()) .and_then(|p| p.strip_prefix("/settings")) else { info!("Unexpected topic: {topic}"); - return false; + return Changed::Unchanged; }; if payload.is_empty() { - // Get, Dump, List + // Get, Dump, or List // Try a Get assuming a leaf node if let Err(err) = client.publish( DeferredPublication::new(|buf| settings.get_json(path, buf)) @@ -538,37 +538,35 @@ where "Root not found" }) }) - .map(|m| { - *pending = m; - state.process_event(sm::Events::Multipart).unwrap(); - }) - .map_err(|response| { - Self::respond( - response, - ResponseCode::Error, - properties, - client, - ) - .ok(); - }) - .ok(); + .map_or_else( + |err| { + Self::respond(err, ResponseCode::Error, properties, client) + .ok(); + }, + |m| { + *pending = m; + state.process_event(sm::Events::Multipart).unwrap(); + // Response comes through iter_list/iter_dump + }, + ); } minimq::PubError::Serialization(err) => { Self::respond(err, ResponseCode::Error, properties, client).ok(); } minimq::PubError::Error(err) => { - error!("Get error failure: {err:?}"); + error!("Get failure: {err:?}"); } } } - false + Changed::Unchanged } else { // Set settings .set_json(path, payload) - .map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok()) .map_err(|err| Self::respond(err, ResponseCode::Error, properties, client).ok()) + .map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok()) .is_ok() + .into() } }) .map(Option::unwrap_or_default) @@ -576,9 +574,26 @@ where minimq::Error::SessionReset => { warn!("Session reset"); self.state.process_event(sm::Events::Reset).unwrap(); - Ok(false) + Ok(Changed::Unchanged) } other => Err(other.into()), }) } } + +#[derive(Default, Copy, Clone, PartialEq, PartialOrd)] +enum Changed { + #[default] + Unchanged, + Changed, +} + +impl From for Changed { + fn from(value: bool) -> Self { + if value { + Self::Changed + } else { + Self::Unchanged + } + } +} From 990bf165af874c4ba12f121f598be29b43e1bc94 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 16:40:16 +0200 Subject: [PATCH 11/24] py: rationalize, remove list option, changelog update --- CHANGELOG.md | 11 ++++--- py/miniconf-mqtt/miniconf/__main__.py | 47 +++++++++++++-------------- py/miniconf-mqtt/miniconf/miniconf.py | 24 ++++++++++---- 3 files changed, 46 insertions(+), 36 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b1cdfbb..5bd650a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,10 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `Keys::is_empty()` -> `Keys::finalize()` * `traverse_by_key` ensures `Keys::finalize()` * `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()` -* The MQTT client performs path listings by publishing an empty payload to an internal node - with a response topic +* [MQTT] path listing are done by publishing an empty payload to an internal node path + with a response topic (not to the `/list` topic anymore) * [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` -must be passed to Miniconf + must be passed to Miniconf +* [MQTT, Python] `--list` option removed in favor of `PATH?` ### Added @@ -32,8 +33,8 @@ must be passed to Miniconf * `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct `Transcode`/`IntoKeys`/`Keys` handling * [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting - the empty payload to an internal node without a response topic -* [MQTT] support partial listing + the empty payload to an internal node without a response topic: `PATH!` +* [MQTT] support partial listing: `PATH?` ### Removed diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index 52ae016c..25696241 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -32,6 +32,8 @@ def main(): %(prog)s -d dt/sinara/dual-iir/+ '/afe/0' # GET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR +%(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # DUMP +%(prog)s -d dt/sinara/dual-iir/+ '/afe!' '!' # LIST-GET """, ) parser.add_argument( @@ -50,12 +52,6 @@ def main(): parser.add_argument( "--discover", "-d", action="store_true", help="Detect and list device prefixes" ) - parser.add_argument( - "--list", - "-l", - action="store_true", - help="List all active settings after modification", - ) parser.add_argument( "prefix", type=str, @@ -66,8 +62,8 @@ def main(): metavar="CMD", nargs="*", help="Path to get ('PATH') or path and JSON encoded value to set " - "('PATH=VALUE') or path to clear ('PATH='). " - "Use sufficient shell escaping.", + "('PATH=VALUE') or path to clear ('PATH='). " + "Use sufficient shell escaping.", ) args = parser.parse_args() @@ -95,30 +91,31 @@ async def run(): interface = Miniconf(client, prefix) for arg in args.paths: - assert arg.startswith("/") or arg in ["", "?", "="] - if arg.endswith("?"): - await interface.dump(arg[:-1]) - print(f"Dumped {arg}") - continue - - try: + path = arg.removesuffix("?") + assert path.startswith("/") or not path + for p in await interface.list_paths(path): + value = await interface.get(p) + print(f"List `{p}` = `{value}`") + elif arg.endswith("!"): + path = arg.removesuffix("!") + assert path.startswith("/") or not path + await interface.dump(path) + print(f"Dumped `{path}` into namespace") + elif "=" in arg: path, value = arg.split("=", 1) - except ValueError: - value = await interface.get(arg) - print(f"{arg} = {value}") - else: + assert path.startswith("/") or not path if not value: await interface.clear(path) - print(f"Cleared retained {path}: OK") + print(f"Cleared retained `{path}`") else: await interface.set(path, json.loads(value), args.retain) - print(f"Set {path} to {value}: OK") - - if args.list: - for path in await interface.list_paths(): + print(f"Set `{path}` = `{value}`") + else: + path = arg + assert path.startswith("/") or not path value = await interface.get(path) - print(f"{path} = {value}") + print(f"Get `{path}` = `{value}`") asyncio.run(run()) diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 7d4a3473..9a2efbd2 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -141,16 +141,28 @@ async def set(self, path, value, retain=False): payload=json.dumps(value, separators=(",", ":")), retain=retain, ) - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] async def list_paths(self, root=""): - """Get a list of all the paths below a given root.""" + """Get a list of all the paths below a given root. + + Args: + root: Path to the root node to list. + """ return await self._do(topic=f"{self.prefix}/settings{root}", payload="") async def dump(self, root=""): - """Dump all the paths at or below a given root into the namespace.""" - return await self._do( + """Dump all the paths at or below a given root into the settings namespace. + + Note that the target Miniconf client may be unable to + responde to messages when a multipart operation (list or dump) is in progress. + This method does not wait for the completion of the dump. + + Args: + root: Path to the root node to dump. + """ + await self._do( topic=f"{self.prefix}/settings{root}", payload="", response=False ) @@ -161,7 +173,7 @@ async def get(self, path): path: The path to get. """ ret = await self._do(topic=f"{self.prefix}/settings{path}", payload="") - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] async def clear(self, path): @@ -171,5 +183,5 @@ async def clear(self, path): path: The path to get. """ ret = await self._do(f"{self.prefix}/settings{path}", payload="", retain=True) - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] From 2b9e53244e199eb215e6f2e907993f6d62a0d4e2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:32:47 +0200 Subject: [PATCH 12/24] adapt the mqtt example to be used with the python client in CI --- .github/workflows/ci.yml | 15 ++---- miniconf_mqtt/README.md | 6 +++ miniconf_mqtt/examples/mqtt.rs | 95 +++++----------------------------- miniconf_mqtt/src/lib.rs | 6 +++ py/test.sh | 19 +++++++ 5 files changed, 47 insertions(+), 94 deletions(-) create mode 100644 py/test.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7debb55d..ae70917b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,9 +57,7 @@ jobs: uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.toolchain }} - - run: cargo check --verbose - - run: cargo build --no-default-features - run: cargo build - run: cargo build --release @@ -74,19 +72,15 @@ jobs: args: - "" - --no-default-features - steps: - uses: actions/checkout@v4 - - - name: Start Mosquitto + - name: Start Broker run: | sudo apt-get install mosquitto sudo service mosquitto start - - uses: dtolnay/rust-toolchain@master with: toolchain: ${{matrix.toolchain}} - - run: cargo test ${{matrix.args }} embedded: @@ -119,12 +113,9 @@ jobs: - mqtt steps: - uses: actions/checkout@v4 - - - name: Start Mosquitto + - name: Start Broker run: | sudo apt-get install mosquitto sudo service mosquitto start - - uses: dtolnay/rust-toolchain@stable - - - run: cargo run --example ${{matrix.example}} + - run: sh py/test.sh diff --git a/miniconf_mqtt/README.md b/miniconf_mqtt/README.md index 07ddd073..253e1c29 100644 --- a/miniconf_mqtt/README.md +++ b/miniconf_mqtt/README.md @@ -2,6 +2,8 @@ This package contains a MQTT client exposing a [`miniconf`](https://crates.io/crates/miniconf) interface via MQTT using [`minimq`](https://crates.io/crates/minimq). +## Command types + | Command | Node | Response Topic | Payload | | --- | --- | --- | --- | | Get | Leaf | set | empty | @@ -9,3 +11,7 @@ This package contains a MQTT client exposing a [`miniconf`](https://crates.io/cr | Dump | | not set | empty | | Set | Leaf | | some | | Error | Internal | | some | + +## Notes + +* A list command will also list paths that are absent at runtime. diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 90494418..0454320f 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -1,114 +1,45 @@ use miniconf::Tree; -use minimq::Publication; use std::time::Duration; use std_embedded_nal::Stack; use std_embedded_time::StandardClock; #[derive(Clone, Default, Tree, Debug)] -struct NestedSettings { +struct Inner { frame_rate: u32, } #[derive(Clone, Default, Tree, Debug)] struct Settings { #[tree(depth = 1)] - inner: NestedSettings, - + inner: Inner, #[tree(depth = 1)] amplitude: [f32; 2], - + array: [i32; 4], exit: bool, } -async fn mqtt_client() { - // Construct a Minimq client to the broker for publishing requests. - let mut buffer = [0u8; 1024]; - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::NamedBroker> = - minimq::Minimq::new( - Stack, - StandardClock::default(), - minimq::ConfigBuilder::new( - minimq::broker::NamedBroker::new("localhost", Stack).unwrap(), - &mut buffer, - ) - .client_id("tester") - .unwrap() - .keepalive_interval(60), - ); - - // Wait for the broker connection - while !mqtt.client().is_connected() { - mqtt.poll(|_client, _topic, _message, _properties| {}) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - log::info!("Test client connected"); - - // Wait momentarily for the other client to connect. - tokio::time::sleep(Duration::from_secs(1)).await; - - // Configure settings. - mqtt.client() - .publish( - Publication::new(b"32.4") - .topic("sample/prefix/settings/amplitude/0") - .finish() - .unwrap(), - ) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - mqtt.client() - .publish( - Publication::new(b"10") - .topic("sample/prefix/settings/inner/frame_rate") - .finish() - .unwrap(), - ) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - mqtt.client() - .publish( - Publication::new(b"true") - .topic("sample/prefix/settings/exit") - .finish() - .unwrap(), - ) - .unwrap(); -} - #[tokio::main] async fn main() { env_logger::init(); - // Spawn a task to send MQTT messages. - tokio::task::spawn(async move { mqtt_client().await }); - let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); // Construct a settings configuration interface. - let mut client: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> = - miniconf_mqtt::MqttClient::new( - Stack, - "sample/prefix", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60), - ) - .unwrap(); + let mut client = miniconf_mqtt::MqttClient::new( + Stack, + "sample/prefix", + StandardClock::default(), + minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer) + .keepalive_interval(60), + ) + .unwrap(); let mut settings = Settings::default(); - loop { + while !settings.exit { + tokio::time::sleep(Duration::from_millis(10)).await; if client.update(&mut settings).unwrap() { println!("Settings updated: {:?}", settings); } - - if settings.exit { - break; - } - - tokio::time::sleep(Duration::from_millis(10)).await; } } diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index c61017ce..60891414 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -511,6 +511,12 @@ where info!("Unexpected topic: {topic}"); return Changed::Unchanged; }; + + if !matches!(&state.state(), sm::States::Multipart | sm::States::Single) { + Self::respond("Not ready", ResponseCode::Ok, properties, client).ok(); + return Changed::Unchanged; + } + if payload.is_empty() { // Get, Dump, or List // Try a Get assuming a leaf node diff --git a/py/test.sh b/py/test.sh new file mode 100644 index 00000000..1533ca12 --- /dev/null +++ b/py/test.sh @@ -0,0 +1,19 @@ +#!/bin/sh + +set -e +set -x + +python -m venv .venv +. .venv/bin/activate +python -m pip install -e py/miniconf-mqtt + +cargo run -p miniconf_mqtt --example mqtt & +sleep 3 + +python -m miniconf -b localhost -d 'sample/+' '!' # DUMP +sleep 1 # dump is asynchronous +python -m miniconf -b localhost -d 'sample/+' '?' # LIST +python -m miniconf -b localhost -d 'sample/+' '/amplitude/0=3' '/inner/frame_rate=9' # SET +python -m miniconf -b localhost -d 'sample/+' '/array' # GET +python -m miniconf -b localhost -d 'sample/+' '/inner/frame_rate=' # CLEAR +python -m miniconf -b localhost -d 'sample/+' '/exit=true' # EXIT From 5b4aba53d025974be3638a1f9e5d0c65456a4999 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:40:25 +0200 Subject: [PATCH 13/24] py: support Option listing --- miniconf_mqtt/examples/mqtt.rs | 3 +++ py/miniconf-mqtt/miniconf/__main__.py | 7 +++++-- py/miniconf-mqtt/miniconf/miniconf.py | 8 +++++--- py/test.sh | 2 +- 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 0454320f..b579fa92 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -15,6 +15,8 @@ struct Settings { #[tree(depth = 1)] amplitude: [f32; 2], array: [i32; 4], + #[tree(depth = 1)] + opt: Option, exit: bool, } @@ -42,4 +44,5 @@ async fn main() { println!("Settings updated: {:?}", settings); } } + println!("Exiting on request"); } diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index 25696241..4548b1e7 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -95,8 +95,11 @@ async def run(): path = arg.removesuffix("?") assert path.startswith("/") or not path for p in await interface.list_paths(path): - value = await interface.get(p) - print(f"List `{p}` = `{value}`") + try: + value = await interface.get(p) + print(f"List `{p}` = `{value}`") + except MiniconfException as err: + print(f"List `{path}`: {repr(err)}") elif arg.endswith("!"): path = arg.removesuffix("!") assert path.startswith("/") or not path diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 9a2efbd2..bf57407d 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -22,6 +22,10 @@ class MiniconfException(Exception): """Generic exceptions generated by Miniconf.""" + def __repr__(self): + code, response = self.args + return f"Code: {code}, Message: {response}" + class Miniconf: """An asynchronous API for controlling Miniconf devices using MQTT.""" @@ -102,9 +106,7 @@ def _dispatch(self, message: Message): ret.append(response) fut.set_result(ret) else: - fut.set_exception( - MiniconfException(f"Received code: {code}, Message: {response}") - ) + fut.set_exception(MiniconfException(code, response)) del self._inflight[response_id] async def _do(self, topic: str, *, response=True, **kwargs): diff --git a/py/test.sh b/py/test.sh index 1533ca12..c389bfbd 100644 --- a/py/test.sh +++ b/py/test.sh @@ -8,7 +8,7 @@ python -m venv .venv python -m pip install -e py/miniconf-mqtt cargo run -p miniconf_mqtt --example mqtt & -sleep 3 +sleep 3 # > REPUBLISH_TIMEOUT_SECONDS python -m miniconf -b localhost -d 'sample/+' '!' # DUMP sleep 1 # dump is asynchronous From 3b7d2e4f16b3e850f254a9dbeb1877b51c43d009 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:48:00 +0200 Subject: [PATCH 14/24] accept set during wait, fixing repub --- miniconf_mqtt/src/lib.rs | 9 ++------- miniconf_mqtt/tests/integration_test.rs | 21 +++++++++------------ py/miniconf-mqtt/miniconf/__main__.py | 2 +- 3 files changed, 12 insertions(+), 20 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 60891414..27f52d76 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -324,7 +324,7 @@ where } sm::States::Init => { info!("Republishing"); - self.publish("").unwrap(); + self.publish("").ok(); } sm::States::Multipart => { if self.pending.response_topic.is_some() { @@ -512,11 +512,6 @@ where return Changed::Unchanged; }; - if !matches!(&state.state(), sm::States::Multipart | sm::States::Single) { - Self::respond("Not ready", ResponseCode::Ok, properties, client).ok(); - return Changed::Unchanged; - } - if payload.is_empty() { // Get, Dump, or List // Try a Get assuming a leaf node @@ -534,7 +529,7 @@ where Traversal::TooShort(_depth), )) => { // Internal node: try Dump or List - (state.state() != &sm::States::Multipart) + (state.state() == &sm::States::Single) .then_some(()) .ok_or("Pending multipart response") .and_then(|()| Multipart::::try_from(properties)) diff --git a/miniconf_mqtt/tests/integration_test.rs b/miniconf_mqtt/tests/integration_test.rs index 400d9266..4f33dd9e 100644 --- a/miniconf_mqtt/tests/integration_test.rs +++ b/miniconf_mqtt/tests/integration_test.rs @@ -81,24 +81,21 @@ fn main() -> std::io::Result<()> { // Construct a Minimq client to the broker for publishing requests. let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::IpBroker> = minimq::Minimq::new( + let mut mqtt = minimq::Minimq::new( Stack, StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer), + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer), ); let mut buffer = [0u8; 1024]; - let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - // Construct a settings configuration interface. - let mut interface: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> = - miniconf_mqtt::MqttClient::new( - Stack, - "device", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer), - ) - .unwrap(); + let mut interface = miniconf_mqtt::MqttClient::new( + Stack, + "device", + StandardClock::default(), + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer), + ) + .unwrap(); // We will wait 100ms in between each state to allow the MQTT broker to catch up let mut state = TestState::started(); diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index 4548b1e7..29c1559e 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -99,7 +99,7 @@ async def run(): value = await interface.get(p) print(f"List `{p}` = `{value}`") except MiniconfException as err: - print(f"List `{path}`: {repr(err)}") + print(f"List `{p}`: {repr(err)}") elif arg.endswith("!"): path = arg.removesuffix("!") assert path.startswith("/") or not path From 147ef9b54f5ff2bdf380a993ff99f64d5c2d6b9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:52:58 +0200 Subject: [PATCH 15/24] py: typing extensions dep --- py/miniconf-mqtt/pyproject.toml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/py/miniconf-mqtt/pyproject.toml b/py/miniconf-mqtt/pyproject.toml index 076b8ed1..2d9e24ae 100644 --- a/py/miniconf-mqtt/pyproject.toml +++ b/py/miniconf-mqtt/pyproject.toml @@ -14,9 +14,7 @@ authors = [ { name = "Ryan Summers", email = "ryan.summers@vertigo-designs.com" }, { name = "Robert Jördens", email = "rj@quartiq.de" }, ] -dependencies = [ - "aiomqtt >= 2.2.0", -] +dependencies = ["aiomqtt >= 2.2.0", "typing_extensions >= 4.12"] [project.urls] Homepage = "https://github.com/quartiq/miniconf" From 6963630232c853b737bebdbeab46154d324cca1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:55:15 +0200 Subject: [PATCH 16/24] build example first --- py/test.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/py/test.sh b/py/test.sh index c389bfbd..c91c4179 100644 --- a/py/test.sh +++ b/py/test.sh @@ -7,6 +7,7 @@ python -m venv .venv . .venv/bin/activate python -m pip install -e py/miniconf-mqtt +cargo build -p miniconf_mqtt --example mqtt & cargo run -p miniconf_mqtt --example mqtt & sleep 3 # > REPUBLISH_TIMEOUT_SECONDS From 293066ba8899289381548693e1d94fdea21c404e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:56:02 +0200 Subject: [PATCH 17/24] build example sync --- py/test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/test.sh b/py/test.sh index c91c4179..adc8d821 100644 --- a/py/test.sh +++ b/py/test.sh @@ -7,7 +7,7 @@ python -m venv .venv . .venv/bin/activate python -m pip install -e py/miniconf-mqtt -cargo build -p miniconf_mqtt --example mqtt & +cargo build -p miniconf_mqtt --example mqtt cargo run -p miniconf_mqtt --example mqtt & sleep 3 # > REPUBLISH_TIMEOUT_SECONDS From cc0b7671fe7a862cca380dd920b56bee17a88362 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 17:59:16 +0200 Subject: [PATCH 18/24] pylint --- py/miniconf-mqtt/miniconf/miniconf.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index bf57407d..c0ef7aaa 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -22,9 +22,12 @@ class MiniconfException(Exception): """Generic exceptions generated by Miniconf.""" + def __init__(self, code, message): + self.code = code + self.message = message + def __repr__(self): - code, response = self.args - return f"Code: {code}, Message: {response}" + return f"Code: {self.code}, Message: {self.message}" class Miniconf: From 4935e2d9b85de041e0998e9c26d29a5e88b16169 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 18:03:33 +0200 Subject: [PATCH 19/24] py: exception --- py/miniconf-mqtt/miniconf/__main__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index 29c1559e..577b4228 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -80,6 +80,7 @@ async def run(): devices = await discover(client, args.prefix) if len(devices) != 1: raise MiniconfException( + "Discover" f"No unique Miniconf device (found `{devices}`). " "Please specify a `--prefix`" ) From 528f18486b203cb734f41a68fb4dc33366960185 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 18:04:59 +0200 Subject: [PATCH 20/24] syntax... --- py/miniconf-mqtt/miniconf/__main__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index 577b4228..b9fcc676 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -80,9 +80,9 @@ async def run(): devices = await discover(client, args.prefix) if len(devices) != 1: raise MiniconfException( - "Discover" + "Discover", f"No unique Miniconf device (found `{devices}`). " - "Please specify a `--prefix`" + "Please specify a `--prefix`", ) prefix = devices.pop() logging.info("Found device prefix: %s", prefix) From 080418c03037ce7cb9e23c8a3ebd3076e8096644 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 22:35:17 +0200 Subject: [PATCH 21/24] match examples --- miniconf_mqtt/examples/mqtt.rs | 15 ++++++++++++++- miniconf_mqtt/src/lib.rs | 2 ++ py/miniconf-mqtt/miniconf/__main__.py | 16 ++++++++-------- py/test.sh | 14 ++++++++------ 4 files changed, 32 insertions(+), 15 deletions(-) diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index b579fa92..435d402a 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -1,4 +1,6 @@ +use heapless::String; use miniconf::Tree; +use serde::{Deserialize, Serialize}; use std::time::Duration; use std_embedded_nal::Stack; use std_embedded_time::StandardClock; @@ -8,8 +10,19 @@ struct Inner { frame_rate: u32, } +#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] +enum Gain { + #[default] + G1, + G10, + G100, +} + #[derive(Clone, Default, Tree, Debug)] struct Settings { + stream: String<32>, + #[tree(depth = 1)] + afe: [Gain; 2], #[tree(depth = 1)] inner: Inner, #[tree(depth = 1)] @@ -30,7 +43,7 @@ async fn main() { // Construct a settings configuration interface. let mut client = miniconf_mqtt::MqttClient::new( Stack, - "sample/prefix", + "dt/sinara/dual-iir/01-02-03-04-05-06", StandardClock::default(), minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer) .keepalive_interval(60), diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 27f52d76..3acbbfa7 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -430,8 +430,10 @@ where .and_then(|_| topic.push_str(&path)) .unwrap(); + let props = [ResponseCode::Ok.into()]; let mut response = DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) .topic(&topic) + .properties(&props) .qos(QoS::AtLeastOnce); if let Some(cd) = &self.pending.correlation_data { diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index b9fcc676..f34a893e 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -28,12 +28,12 @@ def main(): description="Miniconf command line interface.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: -%(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream_target="192.0.2.16:9293"' +%(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' %(prog)s -d dt/sinara/dual-iir/+ '/afe/0' # GET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR -%(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # DUMP -%(prog)s -d dt/sinara/dual-iir/+ '/afe!' '!' # LIST-GET +%(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET +%(prog)s -d dt/sinara/dual-iir/+ '/afe!' # DUMP """, ) parser.add_argument( @@ -58,11 +58,12 @@ def main(): help="The MQTT topic prefix of the target or a prefix filter for discovery", ) parser.add_argument( - "paths", + "commands", metavar="CMD", nargs="*", help="Path to get ('PATH') or path and JSON encoded value to set " - "('PATH=VALUE') or path to clear ('PATH='). " + "('PATH=VALUE') or path to clear ('PATH=') or path to list (`PATH?`) or " + "path to dump (`PATH!`). " "Use sufficient shell escaping.", ) args = parser.parse_args() @@ -81,8 +82,7 @@ async def run(): if len(devices) != 1: raise MiniconfException( "Discover", - f"No unique Miniconf device (found `{devices}`). " - "Please specify a `--prefix`", + f"No unique Miniconf device (found `{devices}`)." ) prefix = devices.pop() logging.info("Found device prefix: %s", prefix) @@ -91,7 +91,7 @@ async def run(): interface = Miniconf(client, prefix) - for arg in args.paths: + for arg in args.commands: if arg.endswith("?"): path = arg.removesuffix("?") assert path.startswith("/") or not path diff --git a/py/test.sh b/py/test.sh index adc8d821..473a38e0 100644 --- a/py/test.sh +++ b/py/test.sh @@ -11,10 +11,12 @@ cargo build -p miniconf_mqtt --example mqtt cargo run -p miniconf_mqtt --example mqtt & sleep 3 # > REPUBLISH_TIMEOUT_SECONDS -python -m miniconf -b localhost -d 'sample/+' '!' # DUMP +python -m miniconf -b localhost dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0' # GET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe!' # DUMP sleep 1 # dump is asynchronous -python -m miniconf -b localhost -d 'sample/+' '?' # LIST -python -m miniconf -b localhost -d 'sample/+' '/amplitude/0=3' '/inner/frame_rate=9' # SET -python -m miniconf -b localhost -d 'sample/+' '/array' # GET -python -m miniconf -b localhost -d 'sample/+' '/inner/frame_rate=' # CLEAR -python -m miniconf -b localhost -d 'sample/+' '/exit=true' # EXIT + +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/exit=true' From f1d3ef3e5a538c8300a74c7b4af920df9c755816 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Mon, 8 Jul 2024 22:40:39 +0200 Subject: [PATCH 22/24] no response no inflight --- py/miniconf-mqtt/miniconf/miniconf.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index c0ef7aaa..78291afb 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -115,15 +115,15 @@ def _dispatch(self, message: Message): async def _do(self, topic: str, *, response=True, **kwargs): await self.subscribed.wait() - request_id = uuid.uuid1().bytes - fut = asyncio.get_event_loop().create_future() - assert request_id not in self._inflight - self._inflight[request_id] = fut, [] - props = Properties(PacketTypes.PUBLISH) + request_id = uuid.uuid1().bytes + props.CorrelationData = request_id if response: + fut = asyncio.get_event_loop().create_future() + assert request_id not in self._inflight + self._inflight[request_id] = fut, [] props.ResponseTopic = self.response_topic - props.CorrelationData = request_id + LOGGER.info(f"Publishing {topic}: {kwargs['payload']}, [{props}]") await self.client.publish( topic, From 38a1536f35b26bbd930a76842ccc79b51f8e0958 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Robert=20J=C3=B6rdens?= Date: Tue, 9 Jul 2024 00:14:22 +0200 Subject: [PATCH 23/24] don't subscribe to list --- miniconf_mqtt/src/lib.rs | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 3acbbfa7..8a2ea077 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -357,14 +357,8 @@ where fn subscribe(&mut self) -> bool { let mut settings = self.prefix.clone(); settings.push_str("/settings/#").unwrap(); - let mut list = self.prefix.clone(); - list.push_str("/list").unwrap(); let opts = SubscriptionOptions::default().ignore_local_messages(); - let topics = [ - TopicFilter::new(&settings).options(opts), - TopicFilter::new(&list).options(opts), - ]; - + let topics = [TopicFilter::new(&settings).options(opts)]; self.mqtt.client().subscribe(&topics, &[]).is_ok() } From 7b1286307fec21a86254d511c92c4aeb8359828c Mon Sep 17 00:00:00 2001 From: Ryan Summers Date: Tue, 9 Jul 2024 11:50:43 +0200 Subject: [PATCH 24/24] Update py/miniconf-mqtt/miniconf/miniconf.py --- py/miniconf-mqtt/miniconf/miniconf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 78291afb..1272ca6d 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -161,7 +161,7 @@ async def dump(self, root=""): """Dump all the paths at or below a given root into the settings namespace. Note that the target Miniconf client may be unable to - responde to messages when a multipart operation (list or dump) is in progress. + respond to messages when a multipart operation (list or dump) is in progress. This method does not wait for the completion of the dump. Args: