diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7debb55d..ae70917b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -57,9 +57,7 @@ jobs: uses: dtolnay/rust-toolchain@master with: toolchain: ${{ matrix.toolchain }} - - run: cargo check --verbose - - run: cargo build --no-default-features - run: cargo build - run: cargo build --release @@ -74,19 +72,15 @@ jobs: args: - "" - --no-default-features - steps: - uses: actions/checkout@v4 - - - name: Start Mosquitto + - name: Start Broker run: | sudo apt-get install mosquitto sudo service mosquitto start - - uses: dtolnay/rust-toolchain@master with: toolchain: ${{matrix.toolchain}} - - run: cargo test ${{matrix.args }} embedded: @@ -119,12 +113,9 @@ jobs: - mqtt steps: - uses: actions/checkout@v4 - - - name: Start Mosquitto + - name: Start Broker run: | sudo apt-get install mosquitto sudo service mosquitto start - - uses: dtolnay/rust-toolchain@stable - - - run: cargo run --example ${{matrix.example}} + - run: sh py/test.sh diff --git a/CHANGELOG.md b/CHANGELOG.md index ffc24356..5bd650a5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `Keys::is_empty()` -> `Keys::finalize()` * `traverse_by_key` ensures `Keys::finalize()` * `NodeIter::count()` -> `NodeIter::exact_size()` to disambiguate from `Iterator::count()` +* [MQTT] path listing are done by publishing an empty payload to an internal node path + with a response topic (not to the `/list` topic anymore) +* [MQTT, Python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` + must be passed to Miniconf +* [MQTT, Python] `--list` option removed in favor of `PATH?` ### Added @@ -27,16 +32,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * `miniconf_cli`: a menu/command line interface * `Path`, `JsonPath`/`JsonPathIter`, `Indices`, `KeysIter` wrapper types for more ergonomic/succinct `Transcode`/`IntoKeys`/`Keys` handling +* [MQTT] support on-demand and partial dump (previously called repubish and executed only once) by posting + the empty payload to an internal node without a response topic: `PATH!` +* [MQTT] support partial listing: `PATH?` ### Removed * `digits()` gone in favor of using `usize::checked_ilog10()` * `rust_version` and `MSRV`: these crates aim to support the latest stable version of rust -### Changed -* [breaking-python] The `Miniconf::create` method is no longer used. Instead, an `aiomqtt::Client` -must be passed to Miniconf - ## [0.11.0](https://github.com/quartiq/miniconf/compare/v0.10.1...v0.11.0) - 2024-04-30 ### Changed diff --git a/miniconf/src/jsonpath.rs b/miniconf/src/jsonpath.rs index ffb023bd..a4576213 100644 --- a/miniconf/src/jsonpath.rs +++ b/miniconf/src/jsonpath.rs @@ -1,6 +1,6 @@ use core::{ fmt::Write, - ops::{Deref, DerefMut}, + ops::{ControlFlow::*, Deref, DerefMut}, }; use serde::{Deserialize, Serialize}; @@ -57,17 +57,16 @@ impl<'a> Iterator for JsonPathIter<'a> { type Item = &'a str; fn next(&mut self) -> Option { - // Reappropriation of `Result` as `Either` for (open, close) in [ - (".'", Ok("'")), // "'" inclusive - (".", Err(&['.', '['][..])), // '.' or '[' exclusive - ("['", Ok("']")), // "']" inclusive - ("[", Ok("]")), // "]" inclusive + (".'", Continue("'")), // "'" inclusive + (".", Break(&['.', '['][..])), // '.' or '[' exclusive + ("['", Continue("']")), // "']" inclusive + ("[", Continue("]")), // "]" inclusive ] { if let Some(rest) = self.0.strip_prefix(open) { let (end, sep) = match close { - Err(close) => (rest.find(close).unwrap_or(rest.len()), 0), - Ok(close) => (rest.find(close)?, close.len()), + Break(close) => (rest.find(close).unwrap_or(rest.len()), 0), + Continue(close) => (rest.find(close)?, close.len()), }; let (next, rest) = rest.split_at(end); self.0 = &rest[sep..]; diff --git a/miniconf_cli/Cargo.toml b/miniconf_cli/Cargo.toml index 3f87b1f9..7eab86b2 100644 --- a/miniconf_cli/Cargo.toml +++ b/miniconf_cli/Cargo.toml @@ -19,7 +19,7 @@ miniconf = { version = "0.11.0", path = "../miniconf", features = [ ] } postcard = "1.0.8" serde-json-core = "0.5.1" -yafnv = "1.0.0" +yafnv = "2.0.0" [features] std = [] diff --git a/miniconf_cli/src/lib.rs b/miniconf_cli/src/lib.rs index 03f5798e..c45ab0a8 100644 --- a/miniconf_cli/src/lib.rs +++ b/miniconf_cli/src/lib.rs @@ -217,7 +217,7 @@ where (val as _, rest) } }; - let check: u32 = yafnv::fnv1a(val.iter().copied()); + let check: u32 = yafnv::fnv1a(val); awrite(&mut write, " ".as_bytes()).await?; let rl = rest.len(); let mut sl = &mut rest[..]; @@ -234,7 +234,7 @@ where Err(miniconf::Error::Traversal(Traversal::Absent(_depth))) => "absent".as_bytes(), ret => &buf[..ret?], }; - if yafnv::fnv1a::(def.iter().copied()) == check { + if yafnv::fnv1a::(def) == check { awrite(&mut write, " (default)\n".as_bytes()).await?; } else { awrite(&mut write, " (default: ".as_bytes()).await?; diff --git a/miniconf_mqtt/Cargo.toml b/miniconf_mqtt/Cargo.toml index d84807b6..9c499c5b 100644 --- a/miniconf_mqtt/Cargo.toml +++ b/miniconf_mqtt/Cargo.toml @@ -18,7 +18,7 @@ std = [] [dependencies] miniconf = { version = "0.11", features = ["json-core"], default-features = false, path = "../miniconf" } minimq = "0.9.0" -smlang = "0.6" +smlang = "0.7" embedded-io = "0.6" log = "0.4" heapless = "0.8" diff --git a/miniconf_mqtt/README.md b/miniconf_mqtt/README.md index d010d150..253e1c29 100644 --- a/miniconf_mqtt/README.md +++ b/miniconf_mqtt/README.md @@ -1,3 +1,17 @@ # `miniconf` MQTT Client This package contains a MQTT client exposing a [`miniconf`](https://crates.io/crates/miniconf) interface via MQTT using [`minimq`](https://crates.io/crates/minimq). + +## Command types + +| Command | Node | Response Topic | Payload | +| --- | --- | --- | --- | +| Get | Leaf | set | empty | +| List | Internal | set | empty | +| Dump | | not set | empty | +| Set | Leaf | | some | +| Error | Internal | | some | + +## Notes + +* A list command will also list paths that are absent at runtime. diff --git a/miniconf_mqtt/examples/mqtt.rs b/miniconf_mqtt/examples/mqtt.rs index 90494418..435d402a 100644 --- a/miniconf_mqtt/examples/mqtt.rs +++ b/miniconf_mqtt/examples/mqtt.rs @@ -1,114 +1,61 @@ +use heapless::String; use miniconf::Tree; -use minimq::Publication; +use serde::{Deserialize, Serialize}; use std::time::Duration; use std_embedded_nal::Stack; use std_embedded_time::StandardClock; #[derive(Clone, Default, Tree, Debug)] -struct NestedSettings { +struct Inner { frame_rate: u32, } +#[derive(Copy, Clone, Default, Debug, Serialize, Deserialize)] +enum Gain { + #[default] + G1, + G10, + G100, +} + #[derive(Clone, Default, Tree, Debug)] struct Settings { + stream: String<32>, #[tree(depth = 1)] - inner: NestedSettings, - + afe: [Gain; 2], + #[tree(depth = 1)] + inner: Inner, #[tree(depth = 1)] amplitude: [f32; 2], - + array: [i32; 4], + #[tree(depth = 1)] + opt: Option, exit: bool, } -async fn mqtt_client() { - // Construct a Minimq client to the broker for publishing requests. - let mut buffer = [0u8; 1024]; - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::NamedBroker> = - minimq::Minimq::new( - Stack, - StandardClock::default(), - minimq::ConfigBuilder::new( - minimq::broker::NamedBroker::new("localhost", Stack).unwrap(), - &mut buffer, - ) - .client_id("tester") - .unwrap() - .keepalive_interval(60), - ); - - // Wait for the broker connection - while !mqtt.client().is_connected() { - mqtt.poll(|_client, _topic, _message, _properties| {}) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - } - - log::info!("Test client connected"); - - // Wait momentarily for the other client to connect. - tokio::time::sleep(Duration::from_secs(1)).await; - - // Configure settings. - mqtt.client() - .publish( - Publication::new(b"32.4") - .topic("sample/prefix/settings/amplitude/0") - .finish() - .unwrap(), - ) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - mqtt.client() - .publish( - Publication::new(b"10") - .topic("sample/prefix/settings/inner/frame_rate") - .finish() - .unwrap(), - ) - .unwrap(); - tokio::time::sleep(Duration::from_millis(100)).await; - - mqtt.client() - .publish( - Publication::new(b"true") - .topic("sample/prefix/settings/exit") - .finish() - .unwrap(), - ) - .unwrap(); -} - #[tokio::main] async fn main() { env_logger::init(); - // Spawn a task to send MQTT messages. - tokio::task::spawn(async move { mqtt_client().await }); - let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); // Construct a settings configuration interface. - let mut client: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> = - miniconf_mqtt::MqttClient::new( - Stack, - "sample/prefix", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer).keepalive_interval(60), - ) - .unwrap(); + let mut client = miniconf_mqtt::MqttClient::new( + Stack, + "dt/sinara/dual-iir/01-02-03-04-05-06", + StandardClock::default(), + minimq::ConfigBuilder::<'_, minimq::broker::IpBroker>::new(localhost.into(), &mut buffer) + .keepalive_interval(60), + ) + .unwrap(); let mut settings = Settings::default(); - loop { + while !settings.exit { + tokio::time::sleep(Duration::from_millis(10)).await; if client.update(&mut settings).unwrap() { println!("Settings updated: {:?}", settings); } - - if settings.exit { - break; - } - - tokio::time::sleep(Duration::from_millis(10)).await; } + println!("Exiting on request"); } diff --git a/miniconf_mqtt/src/lib.rs b/miniconf_mqtt/src/lib.rs index 14c06825..8a2ea077 100644 --- a/miniconf_mqtt/src/lib.rs +++ b/miniconf_mqtt/src/lib.rs @@ -5,19 +5,22 @@ #![forbid(unsafe_code)] //! The Minimq MQTT client for `miniconf``. +use core::fmt::Display; + use heapless::{String, Vec}; -use miniconf::{Error, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; +use log::{debug, error, info, warn}; +use miniconf::{IntoKeys, JsonCoreSlash, NodeIter, Path, Traversal, TreeKey}; pub use minimq; use minimq::{ embedded_nal::TcpClientStack, embedded_time, - types::{SubscriptionOptions, TopicFilter}, - DeferredPublication, Publication, QoS, + types::{Properties, SubscriptionOptions, TopicFilter}, + ConfigBuilder, DeferredPublication, ProtocolError, Publication, QoS, }; use embedded_io::Write; -// The maximum topic length of any settings path. +// The maximum topic length of any topic (prefix + "/settings" + miniconf path). const MAX_TOPIC_LENGTH: usize = 128; // The maximum amount of correlation data that will be cached for listing. This is set to function @@ -28,102 +31,170 @@ const MAX_CD_LENGTH: usize = 32; // republished. const REPUBLISH_TIMEOUT_SECONDS: u32 = 2; -type Iter = NodeIter, '/'>>; +const SEPARATOR: char = '/'; + +type Iter = NodeIter, SEPARATOR>>; + +/// Miniconf MQTT joint error type +#[derive(Debug, PartialEq)] +pub enum Error { + /// Miniconf + Miniconf(miniconf::Error<()>), + /// State machine + State(sm::Error), + /// Minimq + Minimq(minimq::Error), +} + +impl From for Error { + fn from(value: sm::Error) -> Self { + Self::State(value) + } +} + +impl From> for Error { + fn from(value: miniconf::Error) -> Self { + Self::Miniconf(match value { + miniconf::Error::Finalization(_) => miniconf::Error::Finalization(()), + miniconf::Error::Inner(depth, _) => miniconf::Error::Inner(depth, ()), + miniconf::Error::Traversal(t) => miniconf::Error::Traversal(t), + _ => unimplemented!(), + }) + } +} + +impl From for Error { + fn from(value: miniconf::Traversal) -> Self { + Self::Miniconf(value.into()) + } +} + +impl From> for Error { + fn from(value: minimq::Error) -> Self { + Self::Minimq(value) + } +} mod sm { - use super::{Iter, TreeKey, REPUBLISH_TIMEOUT_SECONDS}; + use super::REPUBLISH_TIMEOUT_SECONDS; use minimq::embedded_time::{self, duration::Extensions, Instant}; use smlang::statemachine; statemachine! { transitions: { - *Initial + Connected = ConnectedToBroker, - ConnectedToBroker + IndicatedLife = PendingSubscribe, - - // After initial subscriptions, we start a timeout to republish all settings. - PendingSubscribe + Subscribed / start_republish_timeout = PendingRepublish, - - // Settings republish can be completed any time after subscription. - PendingRepublish + StartRepublish / start_republish = RepublishingSettings, - RepublishingSettings + StartRepublish / start_republish = RepublishingSettings, - Active + StartRepublish / start_republish = RepublishingSettings, - - // After republishing settings, we are in an idle "active" state. - RepublishingSettings + Complete = Active, - - // All states transition back to `initial` on reset. - _ + Reset = Initial, + *Connect + Connect = Alive, + Alive + Alive = Subscribe, + Subscribe + Subscribe / start_timeout = Wait, + Wait + Tick [timed_out] = Init, + Init + Multipart = Multipart, + Multipart + Complete = Single, + Single + Multipart = Multipart, + _ + Reset = Connect, } } - pub struct Context, const Y: usize> { + pub struct Context { clock: C, timeout: Option>, - pub republish_state: Iter, } - impl, const Y: usize> Context { + impl Context { pub fn new(clock: C) -> Self { Self { clock, timeout: None, - republish_state: M::nodes(), } } + } - pub fn republish_has_timed_out(&self) -> bool { - if let Some(timeout) = self.timeout { - self.clock.try_now().unwrap() > timeout - } else { - false - } + impl StateMachineContext for Context { + fn timed_out(&self) -> Result { + Ok(self + .timeout + .map(|t| self.clock.try_now().unwrap() >= t) + .unwrap_or_default()) } - } - impl, const Y: usize> StateMachineContext - for Context - { - fn start_republish_timeout(&mut self) { + fn start_timeout(&mut self) -> Result<(), ()> { self.timeout .replace(self.clock.try_now().unwrap() + REPUBLISH_TIMEOUT_SECONDS.seconds()); + Ok(()) } + } +} + +/// Cache correlation data and topic for multi-part responses. +struct Multipart, const Y: usize> { + iter: Iter, + response_topic: Option>, + correlation_data: Option>, +} - fn start_republish(&mut self) { - self.republish_state = M::nodes(); +impl, const Y: usize> Default for Multipart { + fn default() -> Self { + Self { + iter: M::nodes(), + response_topic: None, + correlation_data: None, } } } -enum Command<'a> { - List, - Get { path: &'a str }, - Set { path: &'a str, value: &'a [u8] }, +impl, const Y: usize> Multipart { + fn root(mut self, keys: K) -> Result { + self.iter = self.iter.root(keys)?; + Ok(self) + } } -impl<'a> Command<'a> { - fn from_message(topic: &'a str, value: &'a [u8]) -> Result { - let path = topic.strip_prefix('/').unwrap_or(topic); - - if path == "list" { - Ok(Command::List) - } else { - match path.strip_prefix("settings") { - Some(path) => { - if value.is_empty() { - Ok(Command::Get { path }) - } else { - Ok(Command::Set { path, value }) - } +impl, const Y: usize> TryFrom<&minimq::types::Properties<'_>> for Multipart { + type Error = &'static str; + fn try_from(value: &minimq::types::Properties<'_>) -> Result { + let response_topic = value + .into_iter() + .response_topic() + .map(TryInto::try_into) + .transpose() + .or(Err("Response topic too long"))?; + let correlation_data = value + .into_iter() + .find_map(|prop| { + if let Ok(minimq::Property::CorrelationData(cd)) = prop { + Some(Vec::try_from(cd.0)) + } else { + None } - _ => Err(()), - } - } + }) + .transpose() + .or(Err("Correlation data too long"))?; + Ok(Self { + iter: M::nodes(), + response_topic, + correlation_data, + }) } } -struct ListCache { - topic: String, - correlation_data: Option>, +#[derive(Debug, Copy, Clone, PartialEq)] +enum ResponseCode { + Ok, + Continue, + Error, +} + +impl From for minimq::Property<'static> { + fn from(value: ResponseCode) -> Self { + let string = match value { + ResponseCode::Ok => "Ok", + ResponseCode::Continue => "Continue", + ResponseCode::Error => "Error", + }; + + minimq::Property::UserProperty( + minimq::types::Utf8String("code"), + minimq::types::Utf8String(string), + ) + } } /// MQTT settings interface. @@ -172,9 +243,9 @@ where Broker: minimq::Broker, { mqtt: minimq::Minimq<'buf, Stack, Clock, Broker>, - state: sm::StateMachine>, + state: sm::StateMachine>, prefix: String, - listing_state: Option<(ListCache, Iter)>, + pending: Multipart, } impl<'buf, Settings, Stack, Clock, Broker, const Y: usize> @@ -196,384 +267,330 @@ where stack: Stack, prefix: &str, clock: Clock, - config: minimq::ConfigBuilder<'buf, Broker>, - ) -> Result { + config: ConfigBuilder<'buf, Broker>, + ) -> Result { + assert!( + prefix.len() + "/settings".len() + Settings::metadata().max_length("/") + <= MAX_TOPIC_LENGTH + ); + // Configure a will so that we can indicate whether or not we are connected. let prefix = String::try_from(prefix).unwrap(); - let mut connection_topic = prefix.clone(); - connection_topic.push_str("/alive").unwrap(); - let will = minimq::Will::new(&connection_topic, b"0", &[])? + let mut alive = prefix.clone(); + alive.push_str("/alive").unwrap(); + let will = minimq::Will::new(&alive, b"0", &[])? .retained() .qos(QoS::AtMostOnce); - let config = config.autodowngrade_qos().will(will)?; - let mqtt = minimq::Minimq::new(stack, clock.clone(), config); - - let max_length = Settings::metadata().max_length("/"); - assert!(prefix.len() + "/settings".len() + max_length <= MAX_TOPIC_LENGTH); - Ok(Self { - mqtt, + mqtt: minimq::Minimq::new(stack, clock.clone(), config), state: sm::StateMachine::new(sm::Context::new(clock)), prefix, - listing_state: None, + pending: Multipart::default(), }) } - fn handle_listing(&mut self) { - let Some((cache, iter)) = &mut self.listing_state else { - return; - }; + /// Update the MQTT interface and service the network. + /// + /// # Returns + /// True if the settings changed. False otherwise. + pub fn update(&mut self, settings: &mut Settings) -> Result> { + if !self.mqtt.client().is_connected() { + // Note(unwrap): It's always safe to reset. + self.state.process_event(sm::Events::Reset).unwrap(); + } - while self.mqtt.client().can_publish(QoS::AtLeastOnce) { - // Note(unwrap): Publishing should not fail because `can_publish()` was checked before - // attempting this publish. - let (code, path) = match iter.next() { - Some(path) => (ResponseCode::Continue, path.unwrap().0.into_inner()), - None => (ResponseCode::Ok, String::new()), - }; + match self.state.state() { + sm::States::Connect => { + if self.mqtt.client().is_connected() { + info!("Connected"); + self.state.process_event(sm::Events::Connect).unwrap(); + } + } + sm::States::Alive => { + if self.alive() { + self.state.process_event(sm::Events::Alive).unwrap(); + } + } + sm::States::Subscribe => { + if self.subscribe() { + info!("Subscribed"); + self.state.process_event(sm::Events::Subscribe).unwrap(); + } + } + sm::States::Wait => { + self.state.process_event(sm::Events::Tick).ok(); + } + sm::States::Init => { + info!("Republishing"); + self.publish("").ok(); + } + sm::States::Multipart => { + if self.pending.response_topic.is_some() { + self.iter_list(); + } else { + self.iter_dump(settings); + } + } + sm::States::Single => { // handled in poll() + } + } + // All states must handle MQTT traffic. + self.poll(settings).map(|c| c == Changed::Changed) + } + + fn alive(&mut self) -> bool { + // Publish a connection status message. + let mut alive = self.prefix.clone(); + alive.push_str("/alive").unwrap(); + let msg = Publication::new(b"1") + .topic(&alive) + .qos(QoS::AtLeastOnce) + .retain() + .finish() + .unwrap(); // Note(unwrap): has topic + + self.mqtt.client().publish(msg).is_ok() + } - let props = [code.as_user_property()]; - let mut outgoing = Publication::new(path.as_bytes()) - .topic(&cache.topic) + fn subscribe(&mut self) -> bool { + let mut settings = self.prefix.clone(); + settings.push_str("/settings/#").unwrap(); + let opts = SubscriptionOptions::default().ignore_local_messages(); + let topics = [TopicFilter::new(&settings).options(opts)]; + self.mqtt.client().subscribe(&topics, &[]).is_ok() + } + + /// Force republication of the current settings. + /// + /// # Note + /// This is intended to be used if modification of a setting had side effects that affected + /// another setting. + pub fn publish(&mut self, path: &str) -> Result<(), Error> { + self.pending = Multipart::default().root(&Path::<_, SEPARATOR>::from(path))?; + self.state.process_event(sm::Events::Multipart)?; + Ok(()) + } + + fn iter_list(&mut self) { + while self.mqtt.client().can_publish(QoS::AtLeastOnce) { + let (code, path) = self + .pending + .iter + .next() + .map(|path| { + let (path, node) = path.unwrap(); // Note(unwrap) checked capacity + assert!(node.is_leaf()); // Note(assert): Iterator depth unlimited + (ResponseCode::Continue, path.into_inner()) + }) + .unwrap_or((ResponseCode::Ok, String::new())); + + let props = [code.into()]; + let mut response = Publication::new(path.as_bytes()) + .topic(self.pending.response_topic.as_ref().unwrap()) // Note(unwrap) checked in update() .properties(&props) .qos(QoS::AtLeastOnce); - if let Some(cd) = &cache.correlation_data { - outgoing = outgoing.correlate(cd); + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); } - let publication = match outgoing.finish() { - Ok(response) => response, - Err(e) => { - // Something went wrong. Abort the listing. - log::error!("Listing failed to build response: {e:?}"); - self.listing_state.take(); - return; - } - }; - - // Note(unwrap) We already checked that we can publish earlier. - self.mqtt.client().publish(publication).unwrap(); + self.mqtt + .client() + .publish(response.finish().unwrap()) // Note(unwrap): has topic + .unwrap(); // Note(unwrap) checked can_publish() - // If we're done with listing, bail out of the loop. if code != ResponseCode::Continue { - self.listing_state.take(); + self.state.process_event(sm::Events::Complete).unwrap(); break; } } } - fn handle_republish(&mut self, settings: &Settings) { - while self.mqtt.client().can_publish(QoS::AtMostOnce) { - let Some(path) = self.state.context_mut().republish_state.next() else { - // If we got here, we completed iterating over the topics and published them all. + fn iter_dump(&mut self, settings: &Settings) { + while self.mqtt.client().can_publish(QoS::AtLeastOnce) { + let Some(path) = self.pending.iter.next() else { self.state.process_event(sm::Events::Complete).unwrap(); break; }; - let (path, _node) = path.unwrap(); + let (path, node) = path.unwrap(); // Note(unwraped): checked capacity + assert!(node.is_leaf()); // Note(assert): Iterator depth unlimited - let mut prefixed_topic = self.prefix.clone(); - prefixed_topic + let mut topic = self.prefix.clone(); + topic .push_str("/settings") - .and_then(|_| prefixed_topic.push_str(&path)) + .and_then(|_| topic.push_str(&path)) .unwrap(); - // If the topic is not present, we'll fail to serialize the setting into the - // payload and will never publish. The iterator has already incremented, so this is - // acceptable. - let response = DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) - .topic(&prefixed_topic) - .finish() - .unwrap(); + let props = [ResponseCode::Ok.into()]; + let mut response = DeferredPublication::new(|buf| settings.get_json_by_key(&path, buf)) + .topic(&topic) + .properties(&props) + .qos(QoS::AtLeastOnce); - // Note(unwrap): This should not fail because `can_publish()` was checked before - // attempting this publish. - match self.mqtt.client().publish(response) { - Err(minimq::PubError::Serialization(Error::Traversal(Traversal::Absent(_)))) => {} + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); + } + + // Note(unwrap): has topic + match self.mqtt.client().publish(response.finish().unwrap()) { + Err(minimq::PubError::Serialization(miniconf::Error::Traversal( + Traversal::Absent(_), + ))) => {} - // If the value is too large to serialize, print an error to the topic instead Err(minimq::PubError::Error(minimq::Error::Minimq( minimq::MinimqError::Protocol(minimq::ProtocolError::Serialization( minimq::SerError::InsufficientMemory, )), ))) => { + let props = [ResponseCode::Error.into()]; + let mut response = Publication::new(b"Serialized value too large") + .topic(&topic) + .properties(&props) + .qos(QoS::AtLeastOnce); + + if let Some(cd) = &self.pending.correlation_data { + response = response.correlate(cd); + } + self.mqtt .client() - .publish( - Publication::new(b"") - .topic(&prefixed_topic) - .properties(&[ResponseCode::Error.as_user_property()]) - .finish() - .unwrap(), - ) - .unwrap(); + .publish(response.finish().unwrap()) // Note(unwrap): has topic + .unwrap(); // Note(unwrap): checked can_publish, error message is short } other => other.unwrap(), } } } - fn handle_subscription(&mut self) { - log::info!("MQTT connected, subscribing to settings"); - - // Note(unwrap): We construct a string with two more characters than the prefix - // structure, so we are guaranteed to have space for storage. - let mut settings_topic = self.prefix.clone(); - settings_topic.push_str("/settings/#").unwrap(); - let mut list_topic = self.prefix.clone(); - list_topic.push_str("/list").unwrap(); - - let opts = SubscriptionOptions::default().ignore_local_messages(); - let topics = [ - TopicFilter::new(&settings_topic).options(opts), - TopicFilter::new(&list_topic).options(opts), - ]; - - if self.mqtt.client().subscribe(&topics, &[]).is_ok() { - self.state.process_event(sm::Events::Subscribed).unwrap(); - } - } - - fn handle_indicating_alive(&mut self) { - // Publish a connection status message. - let mut connection_topic = self.prefix.clone(); - connection_topic.push_str("/alive").unwrap(); - - if self - .mqtt - .client() + fn respond<'a, T: Display>( + response: T, + code: ResponseCode, + request: &Properties<'a>, + client: &mut minimq::mqtt_client::MqttClient<'buf, Stack, Clock, Broker>, + ) -> Result< + (), + minimq::PubError>, + > { + client .publish( - Publication::new(b"1") - .topic(&connection_topic) - .retain() - .finish() - .unwrap(), + DeferredPublication::new(|mut buf| { + let start = buf.len(); + write!(buf, "{}", response).and_then(|_| Ok(start - buf.len())) + }) + .reply(request) + .properties(&[code.into()]) + .qos(QoS::AtLeastOnce) + .finish() + .map_err(minimq::Error::from)?, ) - .is_ok() - { - self.state.process_event(sm::Events::IndicatedLife).unwrap(); - } - } - - /// Update the MQTT interface and service the network. - /// - /// # Returns - /// True if the settings changed. False otherwise. - pub fn update(&mut self, settings: &mut Settings) -> Result> { - if !self.mqtt.client().is_connected() { - // Note(unwrap): It's always safe to reset. - self.state.process_event(sm::Events::Reset).unwrap(); - } - - match *self.state.state() { - sm::States::Initial => { - if self.mqtt.client().is_connected() { - self.state.process_event(sm::Events::Connected).unwrap(); - } - } - sm::States::ConnectedToBroker => self.handle_indicating_alive(), - sm::States::PendingSubscribe => self.handle_subscription(), - sm::States::PendingRepublish => { - if self.state.context().republish_has_timed_out() { - self.state - .process_event(sm::Events::StartRepublish) - .unwrap(); - } - } - sm::States::RepublishingSettings => self.handle_republish(settings), - - // Nothing to do in the active state. - sm::States::Active => {} - } - - self.handle_listing(); - - // All states must handle MQTT traffic. - self.handle_mqtt_traffic(settings) + .map_err(|err| { + debug!("Response failure: {err:?}"); + err + }) } - fn handle_mqtt_traffic( - &mut self, - settings: &mut Settings, - ) -> Result> { - let mut updated = false; - let poll = self.mqtt.poll(|client, topic, message, properties| { - let Some(path) = topic.strip_prefix(self.prefix.as_str()) else { - log::info!("Unexpected topic prefix: {topic}"); - return; - }; - - let Ok(command) = Command::from_message(path, message) else { - log::info!("Unknown miniconf command: {path}"); - return; + fn poll(&mut self, settings: &mut Settings) -> Result> { + let Self { + mqtt, + state, + prefix, + pending, + } = self; + mqtt.poll(|client, topic, payload, properties| { + let Some(path) = topic + .strip_prefix(prefix.as_str()) + .and_then(|p| p.strip_prefix("/settings")) + else { + info!("Unexpected topic: {topic}"); + return Changed::Unchanged; }; - match command { - Command::List => { - if !properties - .into_iter() - .any(|prop| matches!(prop, Ok(minimq::Property::ResponseTopic(_)))) - { - log::info!("Discarding `List` without `ResponseTopic`"); - return; - } - - let response = match self.listing_state { - Some(_) => "`List` already in progress", - None => { - match handle_listing_request(properties) { - Err(msg) => msg, - Ok(cache) => { - self.listing_state.replace((cache, Settings::nodes())); - - // There is no positive response sent during list commands, - // instead, the response is sent as a property of the listed - // elements. As such, we are now finished processing a list - // command. - return; - } - } - } - }; - - let props = [ResponseCode::Error.as_user_property()]; - if let Ok(response) = minimq::Publication::new(response.as_bytes()) + if payload.is_empty() { + // Get, Dump, or List + // Try a Get assuming a leaf node + if let Err(err) = client.publish( + DeferredPublication::new(|buf| settings.get_json(path, buf)) + .topic(topic) .reply(properties) - .properties(&props) + .properties(&[ResponseCode::Ok.into()]) .qos(QoS::AtLeastOnce) .finish() - { - client.publish(response).ok(); - } - } - - Command::Get { path } => { - let props = [ResponseCode::Ok.as_user_property()]; - let Ok(message) = DeferredPublication::new(|buf| settings.get_json(path, buf)) - .properties(&props) - .reply(properties) - // Override the response topic with the path. - .qos(QoS::AtLeastOnce) - .finish() - else { - // If we can't create the publication, it's because there's no way to reply - // to the message. Since we don't know where to send things, abort now and - // complete handling of the `Get` request. - return; - }; - - if let Err(minimq::PubError::Serialization(err)) = client.publish(message) { - if let Ok(message) = DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) - }) - .properties(&[ResponseCode::Error.as_user_property()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - // Try to send the error as a best-effort. If we don't have enough - // buffer space to encode the error, there's nothing more we can do. - client.publish(message).ok(); - }; - } - } - - Command::Set { path, value } => match settings.set_json(path, value) { - Ok(_depth) => { - updated = true; - if let Ok(response) = Publication::new("OK".as_bytes()) - .properties(&[ResponseCode::Ok.as_user_property()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - client.publish(response).ok(); + .unwrap(), // Note(unwrap): has topic + ) { + match err { + minimq::PubError::Serialization(miniconf::Error::Traversal( + Traversal::TooShort(_depth), + )) => { + // Internal node: try Dump or List + (state.state() == &sm::States::Single) + .then_some(()) + .ok_or("Pending multipart response") + .and_then(|()| Multipart::::try_from(properties)) + .and_then(|m| { + m.root(&Path::<_, '/'>::from(path)).map_err(|err| { + debug!("List/Pub root: {err}"); + "Root not found" + }) + }) + .map_or_else( + |err| { + Self::respond(err, ResponseCode::Error, properties, client) + .ok(); + }, + |m| { + *pending = m; + state.process_event(sm::Events::Multipart).unwrap(); + // Response comes through iter_list/iter_dump + }, + ); } - } - Err(err) => { - if let Ok(response) = DeferredPublication::new(|mut buf| { - let start = buf.len(); - write!(buf, "{}", err).and_then(|_| Ok(start - buf.len())) - }) - .properties(&[ResponseCode::Error.as_user_property()]) - .reply(properties) - .qos(QoS::AtLeastOnce) - .finish() - { - client.publish(response).ok(); + minimq::PubError::Serialization(err) => { + Self::respond(err, ResponseCode::Error, properties, client).ok(); + } + minimq::PubError::Error(err) => { + error!("Get failure: {err:?}"); } } - }, + } + Changed::Unchanged + } else { + // Set + settings + .set_json(path, payload) + .map_err(|err| Self::respond(err, ResponseCode::Error, properties, client).ok()) + .map(|_depth| Self::respond("OK", ResponseCode::Ok, properties, client).ok()) + .is_ok() + .into() } - }); - match poll { - Ok(_) => Ok(updated), - Err(minimq::Error::SessionReset) => { - log::warn!("Session reset"); + }) + .map(Option::unwrap_or_default) + .or_else(|err| match err { + minimq::Error::SessionReset => { + warn!("Session reset"); self.state.process_event(sm::Events::Reset).unwrap(); - Ok(false) + Ok(Changed::Unchanged) } - Err(other) => Err(other), - } - } - - /// Force republication of the current settings. - /// - /// # Note - /// This is intended to be used if modification of a setting had side effects that affected - /// another setting. - pub fn force_republish(&mut self) { - self.state.process_event(sm::Events::StartRepublish).ok(); + other => Err(other.into()), + }) } } -#[derive(Debug, Copy, Clone, PartialEq)] -enum ResponseCode { - Ok, - Continue, - Error, +#[derive(Default, Copy, Clone, PartialEq, PartialOrd)] +enum Changed { + #[default] + Unchanged, + Changed, } -impl ResponseCode { - const fn as_user_property(self) -> minimq::Property<'static> { - let string = match self { - ResponseCode::Ok => "Ok", - ResponseCode::Continue => "Continue", - ResponseCode::Error => "Error", - }; - - minimq::Property::UserProperty( - minimq::types::Utf8String("code"), - minimq::types::Utf8String(string), - ) - } -} - -fn handle_listing_request( - properties: &minimq::types::Properties<'_>, -) -> Result { - // If the response topic is too long, send an error - let response_topic = properties.into_iter().response_topic().unwrap(); - - // If there is a CD and it's too long, send an error response. - let correlation_data = if let Some(cd) = properties.into_iter().find_map(|prop| { - if let Ok(minimq::Property::CorrelationData(cd)) = prop { - Some(cd.0) +impl From for Changed { + fn from(value: bool) -> Self { + if value { + Self::Changed } else { - None + Self::Unchanged } - }) { - Some(Vec::try_from(cd).map_err(|_| "Correlation data too long")?) - } else { - None - }; - - Ok(ListCache { - topic: String::try_from(response_topic).map_err(|_| "Response topic too long")?, - correlation_data, - }) + } } diff --git a/miniconf_mqtt/tests/integration_test.rs b/miniconf_mqtt/tests/integration_test.rs index 400d9266..4f33dd9e 100644 --- a/miniconf_mqtt/tests/integration_test.rs +++ b/miniconf_mqtt/tests/integration_test.rs @@ -81,24 +81,21 @@ fn main() -> std::io::Result<()> { // Construct a Minimq client to the broker for publishing requests. let mut buffer = [0u8; 1024]; let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - let mut mqtt: minimq::Minimq<'_, _, _, minimq::broker::IpBroker> = minimq::Minimq::new( + let mut mqtt = minimq::Minimq::new( Stack, StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer), + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer), ); let mut buffer = [0u8; 1024]; - let localhost: minimq::embedded_nal::IpAddr = "127.0.0.1".parse().unwrap(); - // Construct a settings configuration interface. - let mut interface: miniconf_mqtt::MqttClient<'_, _, _, _, minimq::broker::IpBroker, 2> = - miniconf_mqtt::MqttClient::new( - Stack, - "device", - StandardClock::default(), - minimq::ConfigBuilder::new(localhost.into(), &mut buffer), - ) - .unwrap(); + let mut interface = miniconf_mqtt::MqttClient::new( + Stack, + "device", + StandardClock::default(), + minimq::ConfigBuilder::::new(localhost.into(), &mut buffer), + ) + .unwrap(); // We will wait 100ms in between each state to allow the MQTT broker to catch up let mut state = TestState::started(); diff --git a/py/miniconf-mqtt/miniconf/__main__.py b/py/miniconf-mqtt/miniconf/__main__.py index b4189c9f..f34a893e 100644 --- a/py/miniconf-mqtt/miniconf/__main__.py +++ b/py/miniconf-mqtt/miniconf/__main__.py @@ -28,10 +28,12 @@ def main(): description="Miniconf command line interface.", formatter_class=argparse.RawDescriptionHelpFormatter, epilog="""Examples: -%(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream_target="192.0.2.16:9293"' +%(prog)s dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' %(prog)s -d dt/sinara/dual-iir/+ '/afe/0' # GET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET %(prog)s -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR +%(prog)s -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET +%(prog)s -d dt/sinara/dual-iir/+ '/afe!' # DUMP """, ) parser.add_argument( @@ -50,24 +52,19 @@ def main(): parser.add_argument( "--discover", "-d", action="store_true", help="Detect and list device prefixes" ) - parser.add_argument( - "--list", - "-l", - action="store_true", - help="List all active settings after modification", - ) parser.add_argument( "prefix", type=str, help="The MQTT topic prefix of the target or a prefix filter for discovery", ) parser.add_argument( - "paths", + "commands", metavar="CMD", nargs="*", help="Path to get ('PATH') or path and JSON encoded value to set " - "('PATH=VALUE') or path to clear ('PATH='). " - "Use sufficient shell escaping.", + "('PATH=VALUE') or path to clear ('PATH=') or path to list (`PATH?`) or " + "path to dump (`PATH!`). " + "Use sufficient shell escaping.", ) args = parser.parse_args() @@ -84,8 +81,8 @@ async def run(): devices = await discover(client, args.prefix) if len(devices) != 1: raise MiniconfException( - f"No unique Miniconf device (found `{devices}`). " - "Please specify a `--prefix`" + "Discover", + f"No unique Miniconf device (found `{devices}`)." ) prefix = devices.pop() logging.info("Found device prefix: %s", prefix) @@ -94,26 +91,35 @@ async def run(): interface = Miniconf(client, prefix) - for arg in args.paths: - assert (not arg) or arg.startswith("/") - try: + for arg in args.commands: + if arg.endswith("?"): + path = arg.removesuffix("?") + assert path.startswith("/") or not path + for p in await interface.list_paths(path): + try: + value = await interface.get(p) + print(f"List `{p}` = `{value}`") + except MiniconfException as err: + print(f"List `{p}`: {repr(err)}") + elif arg.endswith("!"): + path = arg.removesuffix("!") + assert path.startswith("/") or not path + await interface.dump(path) + print(f"Dumped `{path}` into namespace") + elif "=" in arg: path, value = arg.split("=", 1) - except ValueError: - value = await interface.get(arg) - print(f"{arg} = {value}") - else: + assert path.startswith("/") or not path if not value: await interface.clear(path) - print(f"Cleared retained {path}: OK") - + print(f"Cleared retained `{path}`") else: await interface.set(path, json.loads(value), args.retain) - print(f"Set {path} to {value}: OK") - - if args.list: - for path in await interface.list_paths(): + print(f"Set `{path}` = `{value}`") + else: + path = arg + assert path.startswith("/") or not path value = await interface.get(path) - print(f"{path} = {value}") + print(f"Get `{path}` = `{value}`") asyncio.run(run()) diff --git a/py/miniconf-mqtt/miniconf/miniconf.py b/py/miniconf-mqtt/miniconf/miniconf.py index 1ed1cfa0..1272ca6d 100644 --- a/py/miniconf-mqtt/miniconf/miniconf.py +++ b/py/miniconf-mqtt/miniconf/miniconf.py @@ -22,6 +22,13 @@ class MiniconfException(Exception): """Generic exceptions generated by Miniconf.""" + def __init__(self, code, message): + self.code = code + self.message = message + + def __repr__(self): + return f"Code: {self.code}, Message: {self.message}" + class Miniconf: """An asynchronous API for controlling Miniconf devices using MQTT.""" @@ -102,29 +109,29 @@ def _dispatch(self, message: Message): ret.append(response) fut.set_result(ret) else: - fut.set_exception( - MiniconfException(f"Received code: {code}, Message: {response}") - ) + fut.set_exception(MiniconfException(code, response)) del self._inflight[response_id] - async def _do(self, topic: str, **kwargs): + async def _do(self, topic: str, *, response=True, **kwargs): await self.subscribed.wait() - request_id = uuid.uuid1().bytes - fut = asyncio.get_event_loop().create_future() - assert request_id not in self._inflight - self._inflight[request_id] = fut, [] - props = Properties(PacketTypes.PUBLISH) - props.ResponseTopic = self.response_topic + request_id = uuid.uuid1().bytes props.CorrelationData = request_id + if response: + fut = asyncio.get_event_loop().create_future() + assert request_id not in self._inflight + self._inflight[request_id] = fut, [] + props.ResponseTopic = self.response_topic + LOGGER.info(f"Publishing {topic}: {kwargs['payload']}, [{props}]") await self.client.publish( topic, properties=props, **kwargs, ) - return await fut + if response: + return await fut async def set(self, path, value, retain=False): """Write the provided data to the specified path. @@ -139,12 +146,30 @@ async def set(self, path, value, retain=False): payload=json.dumps(value, separators=(",", ":")), retain=retain, ) - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] - async def list_paths(self): - """Get a list of all the paths available on the device.""" - return await self._do(topic=f"{self.prefix}/list", payload="") + async def list_paths(self, root=""): + """Get a list of all the paths below a given root. + + Args: + root: Path to the root node to list. + """ + return await self._do(topic=f"{self.prefix}/settings{root}", payload="") + + async def dump(self, root=""): + """Dump all the paths at or below a given root into the settings namespace. + + Note that the target Miniconf client may be unable to + respond to messages when a multipart operation (list or dump) is in progress. + This method does not wait for the completion of the dump. + + Args: + root: Path to the root node to dump. + """ + await self._do( + topic=f"{self.prefix}/settings{root}", payload="", response=False + ) async def get(self, path): """Get the specific value of a given path. @@ -153,7 +178,7 @@ async def get(self, path): path: The path to get. """ ret = await self._do(topic=f"{self.prefix}/settings{path}", payload="") - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] async def clear(self, path): @@ -163,5 +188,5 @@ async def clear(self, path): path: The path to get. """ ret = await self._do(f"{self.prefix}/settings{path}", payload="", retain=True) - assert len(ret) == 1 + assert len(ret) == 1, ret return ret[0] diff --git a/py/miniconf-mqtt/pyproject.toml b/py/miniconf-mqtt/pyproject.toml index 076b8ed1..2d9e24ae 100644 --- a/py/miniconf-mqtt/pyproject.toml +++ b/py/miniconf-mqtt/pyproject.toml @@ -14,9 +14,7 @@ authors = [ { name = "Ryan Summers", email = "ryan.summers@vertigo-designs.com" }, { name = "Robert Jördens", email = "rj@quartiq.de" }, ] -dependencies = [ - "aiomqtt >= 2.2.0", -] +dependencies = ["aiomqtt >= 2.2.0", "typing_extensions >= 4.12"] [project.urls] Homepage = "https://github.com/quartiq/miniconf" diff --git a/py/test.sh b/py/test.sh new file mode 100644 index 00000000..473a38e0 --- /dev/null +++ b/py/test.sh @@ -0,0 +1,22 @@ +#!/bin/sh + +set -e +set -x + +python -m venv .venv +. .venv/bin/activate +python -m pip install -e py/miniconf-mqtt + +cargo build -p miniconf_mqtt --example mqtt +cargo run -p miniconf_mqtt --example mqtt & +sleep 3 # > REPUBLISH_TIMEOUT_SECONDS + +python -m miniconf -b localhost dt/sinara/dual-iir/01-02-03-04-05-06 '/stream="192.0.2.16:9293"' +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0' # GET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0="G1"' # SET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe/0=' # CLEAR +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe?' '?' # LIST-GET +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/afe!' # DUMP +sleep 1 # dump is asynchronous + +python -m miniconf -b localhost -d dt/sinara/dual-iir/+ '/exit=true'